Skip to content

Commit 9acdc6d

Browse files
authored
Simplify kafka retransmit workflow (#5)
1 parent 5044c5a commit 9acdc6d

4 files changed

Lines changed: 12 additions & 14 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ PICS=$(patsubst %.uml,%.png,$(wildcard doc/images/*.uml))
22

33
ENV_FILE=.env
44

5-
MIN_COVERAGE := 80
5+
MIN_COVERAGE := 75
66
BUILD_DIR := $(CURDIR)/_build
77
CONCUERROR := $(BUILD_DIR)/Concuerror/bin/concuerror
88
CONCUERROR_RUN := $(CONCUERROR) -x snabbkaffe_collector -x snabbkaffe -x snabbkaffe_nemesis -x snabbkaffe_sup \

src/application/kflow_produce_to_kafka.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ init(Config = #{topic := Topic}) ->
8787
}.
8888

8989
%% @private
90-
map(Offset, Msg = #{partition := P, values := VV}, State) ->
90+
map(Offset, Msg = #{partition := P, value := Val}, State) ->
9191
#s{ client = Client
9292
, topic = Topic
9393
} = State,
@@ -97,7 +97,7 @@ map(Offset, Msg = #{partition := P, values := VV}, State) ->
9797
, partition => P
9898
, offset => Offset
9999
}),
100-
case brod:produce_sync_offset(Client, Topic, P, <<>>, VV) of
100+
case brod:produce_sync_offset(Client, Topic, P, <<>>, Val) of
101101
{ok, OutOffset} ->
102102
prometheus_gauge:set( <<"kflow_kafka_producer_offset">>
103103
, [Topic, P]

src/framework/kflow_kafka_commit.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111

1212
-include("kflow_int.hrl").
1313

14+
-ifndef(TEST).
15+
-define(TEST, false).
16+
-endif.
17+
1418
%% kflow_gen callbacks:
1519
-export([ init/2
1620
, handle_message/3
@@ -97,7 +101,8 @@ handle_message( Msg = #kflow_msg{ offset = Offset
97101
, last_committed_offset => LastCommittedOffset
98102
, group_id => GroupId
99103
, route => Route
100-
})
104+
}),
105+
?TEST andalso error(offset_tracking_bug)
101106
end,
102107
{ok, [Msg], State#{last_committed_offset => SafeOffset}}.
103108

src/workflows/kflow_kafka_retransmit.erl

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,9 @@ pipe_spec(Config) ->
8282
PartFun = maps:get(part_fun, Config, fun partition_by_key/2),
8383
BufferConfig = maps:with([max_size, max_messages], Config),
8484
Preprocess ++
85-
[ %% First, choose what partition the message should end up in the downstream topic:
86-
{map, ?MODULE,
87-
{PartFun, NPartitions}}
88-
%% Then separate messages by partition:
89-
, {demux,
90-
fun(_Offset, #{?out_part := P}) -> P end}
91-
%% Group messages in chunks:
92-
, {aggregate, kflow_group_kafka_messages,
93-
BufferConfig}
94-
%% And finally push chunks to another topic:
85+
[ %% Choose what partition the message should end up in the downstream topic:
86+
{map, ?MODULE, {PartFun, NPartitions}}
87+
%% Push chunks to another topic:
9588
, {map, kflow_produce_to_kafka,
9689
#{ topic => ToTopic
9790
, client => ToClient

0 commit comments

Comments
 (0)