Skip to content

add state to the group consumer #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ erl_crash.dump

/priv
.tool-versions

.idea/

15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,25 @@ Batch message consumers receive a list of messages and work as part of the `:bro

### Kaffe GroupMember - Batch Message Consumer

1. Define a `handle_messages/1` function in the provided module.
1. Define a `init_handler/0` and a `handle_messages/2` function in the provided module.

`handle_messages/1` This function (note the pluralization) will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.
`init_handler/0` This function will be called upon consumer initialization and should return `{:ok, state}`. This state will be passed to the `handle_messages/2`callback

The module's `handle_messages/1` function _must_ return `:ok` or Kaffe will throw an error. The Kaffe consumer will block until your `handle_messages/1` function returns `:ok`.
`handle_messages/2` This function (note the pluralization) will be called with a *list of messages* and the state, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.

The module's `handle_messages/2` function _must_ return `{:ok, state}` or Kaffe will throw an error. The Kaffe consumer will block until your `handle_messages/2` function returns `{:ok, state}`.

```elixir
defmodule MessageProcessor
def handle_messages(messages) do
def init_handler() do
{:ok, 0} # initial state, number of messages received
end
def handle_messages(messages, state) do
for %{key: key, value: value} = message <- messages do
IO.inspect message
IO.puts "#{key}: #{value}"
end
:ok # Important!
{:ok, state + Enum.count(messages)} # Important!
end
end
```
Expand Down
14 changes: 8 additions & 6 deletions lib/kaffe/group_member/worker/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ defmodule Kaffe.Worker do

Processing the message set is delegated to the configured message
handler. It is responsible for any error handling. The message handler
must define a `handle_messages` function (*note* the pluralization!)
to accept a list of messages.
must define a `init_handler/0` function that should return `{:ok, state}`, and
a `handle_messages` function (*note* the pluralization!)
to accept a list of messages and a state, and returns `{:ok, state}`.

The result of `handle_messages` is sent back to the subscriber.
"""
Expand All @@ -20,21 +21,22 @@ defmodule Kaffe.Worker do

def init([message_handler, worker_name]) do
Logger.info "event#starting=#{__MODULE__} name=#{worker_name}"
{:ok, %{message_handler: message_handler, worker_name: worker_name}}
{:ok, handler_state } = apply(message_handler, :init_handler, [])
{:ok, %{message_handler: message_handler, worker_name: worker_name, handler_state: handler_state}}
end

def process_messages(pid, subscriber_pid, topic, partition, generation_id, messages) do
GenServer.cast(pid, {:process_messages, subscriber_pid, topic, partition, generation_id, messages})
end

def handle_cast({:process_messages, subscriber_pid, topic, partition, generation_id, messages},
%{message_handler: message_handler} = state) do
%{message_handler: message_handler, handler_state: handler_state} = state) do

:ok = apply(message_handler, :handle_messages, [messages])
{:ok, new_handler_state} = apply(message_handler, :handle_messages, [messages, handler_state])
offset = Enum.reduce(messages, 0, &max(&1.offset, &2))
subscriber().ack_messages(subscriber_pid, topic, partition, generation_id, offset)

{:noreply, state}
{:noreply, %{state| handler_state: new_handler_state}}
end

def terminate(reason, _state) do
Expand Down
5 changes: 3 additions & 2 deletions test/kaffe/group_member/worker/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ defmodule Kaffe.WorkerTest do
end

defmodule TestHandler do
def handle_messages(messages) do
def init_handler(), do: {:ok, :some_state}
def handle_messages(messages, state) do
send :test_case, {:handle_messages, messages}
:ok
{:ok, state}
end
end

Expand Down