Skip to content

Commit 2783aff

Browse files
committed
Use extreme with backpressure
1 parent 8b8d52b commit 2783aff

File tree

7 files changed

+64
-46
lines changed

7 files changed

+64
-46
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ The format is based on [Keep a
66
Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
77
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
88

9+
## 1.1.0 - 2024-06-18
10+
11+
- Use new `Extreme.ListenerWithBackPressure`
12+
913
## 1.0.1 - 2024-01-31
1014

1115
- Bump all dependencies including bumping `:extreme` to v1.0.5 which fixes a

config/test.exs

+7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
import Config
22

3+
config :logger, :console,
4+
format: "$time $metadata[$level] $message\n",
5+
level: :debug,
6+
metadata: [:pid, :module, :function]
7+
8+
config :ex_unit, capture_log: true
9+
310
config :kelvin, ExtremeClient,
411
db_type: :node,
512
host: System.get_env("EVENTSTORE_HOST") || "localhost",

lib/kelvin/in_order_subscription.ex

+27-29
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ defmodule Kelvin.InOrderSubscription do
3535

3636
defstruct [
3737
:config,
38-
:subscription,
38+
:extreme_listener,
3939
:self,
4040
:max_buffer_size,
4141
demand: 0,
@@ -56,7 +56,31 @@ defmodule Kelvin.InOrderSubscription do
5656
Application.get_env(:kelvin, :catch_up_chunk_size, 256)
5757
)
5858

59+
connection = Keyword.fetch!(opts, :connection)
60+
stream_name = Keyword.fetch!(opts, :stream_name)
61+
62+
listener_name =
63+
opts
64+
|> Keyword.get(:name, __MODULE__)
65+
|> Module.concat(ExtremeListener)
66+
67+
{:ok, extreme_listener} =
68+
Kelvin.Listener.start_link(connection, stream_name,
69+
read_per_page: max_buffer_size,
70+
auto_subscribe: false,
71+
ack_timeout: :infinity,
72+
name: listener_name,
73+
producer: self(),
74+
get_stream_position_fun: fn ->
75+
opts
76+
|> Keyword.fetch!(:restore_stream_position!)
77+
|> _do_function()
78+
|> Kernel.+(1)
79+
end
80+
)
81+
5982
state = %__MODULE__{
83+
extreme_listener: extreme_listener,
6084
config: Map.new(opts),
6185
self: Keyword.get(opts, :name, self()),
6286
max_buffer_size: max_buffer_size
@@ -92,23 +116,9 @@ defmodule Kelvin.InOrderSubscription do
92116
end
93117

94118
def handle_info(:subscribe, state) do
95-
if state.subscription do
96-
# coveralls-ignore-start
97-
Logger.warn("#{inspect(__MODULE__)} is already subscribed.")
98-
# coveralls-ignore-stop
99-
else
100-
case _subscribe(state) do
101-
{:ok, sub} ->
102-
Process.link(sub)
103-
{:noreply, [], put_in(state.subscription, sub)}
104-
105-
# coveralls-ignore-start
106-
{:error, reason} ->
107-
{:stop, reason, state}
119+
Kelvin.Listener.subscribe(state.extreme_listener)
108120

109-
# coveralls-ignore-stop
110-
end
111-
end
121+
{:noreply, [], state}
112122
end
113123

114124
def handle_info(_info, state), do: {:noreply, [], state}
@@ -162,18 +172,6 @@ defmodule Kelvin.InOrderSubscription do
162172
end
163173
end
164174

165-
defp _subscribe(state) do
166-
state.config.connection
167-
|> Extreme.RequestManager._name()
168-
|> GenServer.call(
169-
{:read_and_stay_subscribed, self(),
170-
{state.config.stream_name,
171-
_do_function(state.config.restore_stream_position!) + 1,
172-
state.max_buffer_size, true, false, :infinity}},
173-
:infinity
174-
)
175-
end
176-
177175
defp _do_function(func) when is_function(func, 0), do: func.()
178176

179177
defp _do_function({m, f, a}) when is_atom(m) and is_atom(f) and is_list(a) do

lib/kelvin/listener.ex

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
defmodule Kelvin.Listener do
2+
use Extreme.ListenerWithBackPressure
3+
4+
def on_init(opts) do
5+
state = %{
6+
producer: Keyword.fetch!(opts, :producer),
7+
get_stream_position_fun: Keyword.fetch!(opts, :get_stream_position_fun)
8+
}
9+
10+
{:ok, state}
11+
end
12+
13+
defp get_last_event(_stream_name, %{} = state),
14+
do: state.get_stream_position_fun.()
15+
16+
defp process_push(push, _stream_name, %{} = state),
17+
do: GenServer.call(state.producer, {:on_event, push})
18+
end

mix.exs

+2-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ defmodule Kelvin.MixProject do
4545
defp deps do
4646
[
4747
{:gen_stage, "~> 1.0"},
48-
{:extreme, "~> 1.1.0-rc1"},
48+
# {:extreme, "~> 1.1.0-rc1"},
49+
{:extreme, github: "exponentially/extreme", branch: "event-producer"},
4950
# docs
5051
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
5152
# test

mix.lock

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
88
"excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"},
99
"exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"},
10-
"extreme": {:hex, :extreme, "1.1.0-rc1", "43ee836564d7e647631e06a087a865a9096b21ec9ade0c3db86dde4dd83f32e4", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:exprotobuf, "~> 1.2.9", [hex: :exprotobuf, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fdd0528b488c77f337ffd1d5053cea0e9e1f9731e42d8ddcc659bb6b43ae2328"},
10+
"extreme": {:git, "https://github.com/exponentially/extreme.git", "9cc7438a4f3659af92f081731f545b9a2e18638a", [branch: "event-producer"]},
1111
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
1212
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
1313
"gpb": {:hex, :gpb, "4.21.1", "72e229c242d252d690addcfd04a6416c26c4d4d2c3521e05570a7a78b48d3bd1", [:make, :rebar3], [], "hexpm", "c05c9aea9e25bd341367a43b3d3eb68e951563911072259c5ec4cb6642f4ef22"},

test/kelvin/in_order_subscription_test.exs

+5-15
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
defmodule Kelvin.InOrderSubscriptionTest do
22
use ExUnit.Case, async: true
33

4-
@moduletag :capture_log
5-
64
alias Extreme.Messages
75

86
setup do
@@ -14,7 +12,7 @@ defmodule Kelvin.InOrderSubscriptionTest do
1412

1513
describe "given events have been written to a stream" do
1614
setup c do
17-
write_events(0..100, c.stream_name)
15+
_write_events(0..100, c.stream_name)
1816
:ok
1917
end
2018

@@ -33,7 +31,7 @@ defmodule Kelvin.InOrderSubscriptionTest do
3331
assert event.event.data == to_string(n)
3432
end
3533

36-
write_events(101..200, c.stream_name)
34+
_write_events(101..200, c.stream_name)
3735

3836
for n <- 101..200 do
3937
assert_receive {:events, [event]}, 1_000
@@ -65,15 +63,7 @@ defmodule Kelvin.InOrderSubscriptionTest do
6563

6664
assert_receive {:DOWN, ^monitor_ref, _, _, _}
6765

68-
# we're hardcoding the restore_stream_position! function so this will
69-
# restart from 0 instead of the current stream position as would be the
70-
# case in a real-life system
71-
for n <- 0..100 do
72-
assert_receive {:events, [event]}, 10_000
73-
assert event.event.data == to_string(n)
74-
end
75-
76-
write_events(101..200, c.stream_name)
66+
_write_events(101..200, c.stream_name)
7767

7868
for n <- 101..200 do
7969
assert_receive {:events, [event]}, 1_000
@@ -84,7 +74,7 @@ defmodule Kelvin.InOrderSubscriptionTest do
8474

8575
describe "given only a few events have been written to a stream" do
8676
setup c do
87-
write_events(0..10, c.stream_name)
77+
_write_events(0..10, c.stream_name)
8878
:ok
8979
end
9080

@@ -113,7 +103,7 @@ defmodule Kelvin.InOrderSubscriptionTest do
113103

114104
defp restore_stream_position!, do: -1
115105

116-
defp write_events(range, stream) do
106+
defp _write_events(range, stream) do
117107
range
118108
|> Enum.map(fn n ->
119109
Messages.NewEvent.new(

0 commit comments

Comments
 (0)