@@ -35,7 +35,7 @@ defmodule Kelvin.InOrderSubscription do
35
35
36
36
defstruct [
37
37
:config ,
38
- :subscription ,
38
+ :extreme_listener ,
39
39
:self ,
40
40
:max_buffer_size ,
41
41
demand: 0 ,
@@ -53,13 +53,36 @@ defmodule Kelvin.InOrderSubscription do
53
53
Keyword . get (
54
54
opts ,
55
55
:catch_up_chunk_size ,
56
- Application . get_env ( :kelvin , :catch_up_chunk_size , 256 )
56
+ Application . get_env ( :kelvin , :catch_up_chunk_size , 128 )
57
+ )
58
+
59
+ connection = Keyword . fetch! ( opts , :connection )
60
+ stream_name = Keyword . fetch! ( opts , :stream_name )
61
+
62
+ listener_name =
63
+ opts
64
+ |> Keyword . get ( :name , __MODULE__ )
65
+ |> Module . concat ( ExtremeListener )
66
+
67
+ { :ok , extreme_listener } =
68
+ Kelvin.Listener . start_link ( connection , stream_name ,
69
+ read_per_page: max_buffer_size ,
70
+ auto_subscribe: false ,
71
+ ack_timeout: :infinity ,
72
+ name: listener_name ,
73
+ producer: self ( ) ,
74
+ get_stream_position_fun: fn ->
75
+ opts
76
+ |> Keyword . fetch! ( :restore_stream_position! )
77
+ |> _do_function ( )
78
+ end
57
79
)
58
80
59
81
state = % __MODULE__ {
82
+ extreme_listener: extreme_listener ,
60
83
config: Map . new ( opts ) ,
61
84
self: Keyword . get ( opts , :name , self ( ) ) ,
62
- max_buffer_size: max_buffer_size
85
+ max_buffer_size: max_buffer_size * 2
63
86
}
64
87
65
88
Process . send_after (
@@ -75,7 +98,7 @@ defmodule Kelvin.InOrderSubscription do
75
98
def handle_info ( :check_auto_subscribe , state ) do
76
99
identifier = "#{ inspect ( __MODULE__ ) } (#{ inspect ( state . self ) } )"
77
100
78
- if do_function ( state . config . subscribe_on_init? ) do
101
+ if _do_function ( state . config . subscribe_on_init? ) do
79
102
Logger . info ( "#{ identifier } subscribing to '#{ state . config . stream_name } '" )
80
103
81
104
GenStage . async_info ( self ( ) , :subscribe )
@@ -92,37 +115,21 @@ defmodule Kelvin.InOrderSubscription do
92
115
end
93
116
94
117
def handle_info ( :subscribe , state ) do
95
- if state . subscription do
96
- # coveralls-ignore-start
97
- Logger . warn ( "#{ inspect ( __MODULE__ ) } is already subscribed." )
98
- # coveralls-ignore-stop
99
- else
100
- case subscribe ( state ) do
101
- { :ok , sub } ->
102
- Process . link ( sub )
103
- { :noreply , [ ] , put_in ( state . subscription , sub ) }
104
-
105
- # coveralls-ignore-start
106
- { :error , reason } ->
107
- { :stop , reason , state }
118
+ Kelvin.Listener . subscribe ( state . extreme_listener )
108
119
109
- # coveralls-ignore-stop
110
- end
111
- end
120
+ { :noreply , [ ] , state }
112
121
end
113
122
114
- def handle_info ( _info , state ) , do: { :noreply , [ ] , state }
115
-
116
123
@ impl GenStage
117
124
def handle_call ( { :on_event , event } , from , state ) do
118
125
# when the current demand is 0, we should
119
126
case state do
120
127
% { demand: 0 , buffer_size: size , max_buffer_size: max }
121
128
when size + 1 == max ->
122
- { :noreply , [ ] , enqueue ( state , { event , from } ) }
129
+ { :noreply , [ ] , _enqueue ( state , { event , from } ) }
123
130
124
131
% { demand: 0 } ->
125
- { :reply , :ok , [ ] , enqueue ( state , event ) }
132
+ { :reply , :ok , [ ] , _enqueue ( state , event ) }
126
133
127
134
% { demand: demand } ->
128
135
{ :reply , :ok , [ { state . self , event } ] , put_in ( state . demand , demand - 1 ) }
@@ -131,26 +138,26 @@ defmodule Kelvin.InOrderSubscription do
131
138
132
139
@ impl GenStage
133
140
def handle_demand ( demand , state ) do
134
- dequeue_events ( state , demand , [ ] )
141
+ _dequeue_events ( state , demand , [ ] )
135
142
end
136
143
137
- defp dequeue_events ( % { buffer_size: size } = state , demand , events )
144
+ defp _dequeue_events ( % { buffer_size: size } = state , demand , events )
138
145
when size == 0 or demand == 0 do
139
146
{ :noreply , :lists . reverse ( events ) , put_in ( state . demand , demand ) }
140
147
end
141
148
142
- defp dequeue_events ( state , demand , events ) do
143
- case dequeue ( state ) do
149
+ defp _dequeue_events ( state , demand , events ) do
150
+ case _dequeue ( state ) do
144
151
{ { :value , { event , from } } , state } ->
145
152
GenStage . reply ( from , :ok )
146
- dequeue_events ( state , demand - 1 , [ { state . self , event } | events ] )
153
+ _dequeue_events ( state , demand - 1 , [ { state . self , event } | events ] )
147
154
148
155
{ { :value , event } , state } ->
149
- dequeue_events ( state , demand - 1 , [ { state . self , event } | events ] )
156
+ _dequeue_events ( state , demand - 1 , [ { state . self , event } | events ] )
150
157
end
151
158
end
152
159
153
- defp dequeue ( state ) do
160
+ defp _dequeue ( state ) do
154
161
case :queue . out ( state . buffer ) do
155
162
# coveralls-ignore-start
156
163
{ :empty , buffer } ->
@@ -162,25 +169,13 @@ defmodule Kelvin.InOrderSubscription do
162
169
end
163
170
end
164
171
165
- defp subscribe ( state ) do
166
- state . config . connection
167
- |> Extreme.RequestManager . _name ( )
168
- |> GenServer . call (
169
- { :read_and_stay_subscribed , self ( ) ,
170
- { state . config . stream_name ,
171
- do_function ( state . config . restore_stream_position! ) + 1 ,
172
- state . max_buffer_size , true , false , :infinity } } ,
173
- :infinity
174
- )
175
- end
176
-
177
- defp do_function ( func ) when is_function ( func , 0 ) , do: func . ( )
172
+ defp _do_function ( func ) when is_function ( func , 0 ) , do: func . ( )
178
173
179
- defp do_function ( { m , f , a } ) when is_atom ( m ) and is_atom ( f ) and is_list ( a ) do
174
+ defp _do_function ( { m , f , a } ) when is_atom ( m ) and is_atom ( f ) and is_list ( a ) do
180
175
apply ( m , f , a )
181
176
end
182
177
183
- defp enqueue ( state , element ) do
178
+ defp _enqueue ( state , element ) do
184
179
% {
185
180
state
186
181
| buffer: :queue . in ( element , state . buffer ) ,
0 commit comments