Skip to content

Commit c54a4f2

Browse files
authored
allow erlmld_worker:ready/1 callback to checkpoint (#16)
* allow `erlmld_worker:ready/1` callback to checkpoint * new `erlmld_flusher:heartbeat/1` callback
1 parent f566fb1 commit c54a4f2

File tree

4 files changed

+50
-11
lines changed

4 files changed

+50
-11
lines changed

src/erlmld_batch_processor.erl

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,17 @@ initialize(Opts, ShardId, ISN) ->
105105
{ok, update_watchdog(State)}.
106106

107107

108-
ready(State) ->
109-
{ok, update_watchdog(State)}.
108+
ready(#state{flusher_mod = FMod, flusher_state = FState} = State) ->
109+
{ok, NFState, Tokens} = FMod:heartbeat(FState),
110+
NState = flusher_state(State, NFState),
111+
NNState =
112+
case Tokens of
113+
[] ->
114+
NState;
115+
_ ->
116+
note_success(note_flush(NState), Tokens)
117+
end,
118+
maybe_checkpoint(update_watchdog(NNState)).
110119

111120

112121
process_record(#state{last_flush_time = LastFlush,

src/erlmld_flusher.erl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,18 @@
2929
%%% The batch processor handles checkpointing and decides when to trigger
3030
%%% flushing.
3131
%%%
32+
%%% heartbeat/1 will be called regardless of whether any records could be
33+
%%% obtained from the stream. It may return the same values as flush/2.
34+
%%% If it returns a non-empty list of tokens as the third tuple element, it
35+
%%% is considered to have just performed a partial flush. This allows a
36+
%%% flusher to flush even if no records were actually available on the
37+
%%% stream (e.g., after a period of time has elapsed), avoiding potential
38+
%%% near-deadlock situations which would only be resolved by additional
39+
%%% stream records appearing (where the batch processor is waiting for
40+
%%% tokens from the flusher before checkpointing, but the flusher is
41+
%%% waiting for more records from the batch processor before producing
42+
%%% tokens via flushing).
43+
%%%
3244
%%% @end
3345
%%% Created : 20 Dec 2016 by Constantin Berzan <[email protected]>
3446

@@ -47,3 +59,7 @@
4759
-callback flush(flusher_state(), partial | full) ->
4860
{ok, flusher_state(), list(flusher_token())}
4961
| {error, term()}.
62+
63+
-callback heartbeat(flusher_state()) ->
64+
{ok, flusher_state(), list(flusher_token())}
65+
| {error, term()}.

src/erlmld_worker.erl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
%%% at the most recent sequence number.
2727
%%%
2828
%%% Before starting to process each batch of records, a worker's ready/1 callback is
29-
%%% called, which should return a possibly-updated worker state. This can be useful
30-
%%% when a record processor is using a watchdog timer and is far behind on a stream
31-
%%% (and so won't receive any actual records for a while).
29+
%%% called, which should return a possibly-updated worker state and possibly a
30+
%%% checkpoint. This can be useful when a record processor is using a watchdog timer
31+
%%% and is far behind on a stream (and so won't receive any actual records for a
32+
%%% while), or if a stream has very low volume (records seen less frequently than
33+
%%% desired checkpoint or flush intervals).
3234
%%%
3335
%%% When a shard lease has been lost or a shard has been completely processed, a worker
3436
%%% will be shut down. If the lease was lost, the worker will receive a reason of
@@ -56,6 +58,7 @@
5658

5759
-callback ready(worker_state()) ->
5860
{ok, worker_state()}
61+
| {ok, worker_state(), checkpoint()}
5962
| {error, term()}.
6063

6164
-callback process_record(worker_state(), stream_record()) ->

src/erlmld_wrk_statem.erl

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -325,20 +325,31 @@ handle_event(?INTERNAL, #{<<"action">> := <<"shutdownRequested">>} = R,
325325
%% callback, then deaggregate them all at once if they are in KPL format, and then provide
326326
%% each in turn to the handler module, which will have the opportunity to checkpoint after
327327
%% each record. the MLD should wait for our "status" response before sending any
328-
%% additional records or other requests.
328+
%% additional records or other requests. if the worker returned a checkpoint response,
329+
%% checkpoint before processing the records.
329330
handle_event(?INTERNAL, #{<<"action">> := <<"processRecords">>,
330331
<<"records">> := Records} = R,
331332
{?DISPATCH, ?REQUEST},
332333
#data{handler_module = Mod,
333334
worker_state = {ok, WorkerState}} = Data) ->
334335
case Mod:ready(WorkerState) of
335-
{ok, NWorkerState} ->
336+
{error, _} = Error ->
337+
{stop, Error};
338+
339+
Ready ->
340+
{NWorkerState, Checkpoint} = case Ready of
341+
{ok, S} -> {S, undefined};
342+
{ok, S, C} -> {S, C}
343+
end,
336344
NData = worker_state(Data#data{is_v2 = maps:is_key(<<"millisBehindLatest">>, R)},
337345
NWorkerState),
338-
process_records(R, NData, deaggregate_kpl_records(R, Records));
339-
340-
{error, _} = Error ->
341-
{stop, Error}
346+
DeaggregatedRecords = deaggregate_kpl_records(R, Records),
347+
case Checkpoint of
348+
undefined ->
349+
process_records(R, NData, DeaggregatedRecords);
350+
_ ->
351+
checkpoint(R, NData, Checkpoint, DeaggregatedRecords)
352+
end
342353
end;
343354

344355

0 commit comments

Comments
 (0)