Skip to content

[Recipe] Transient subscription #17

@slashdotdash

Description

@slashdotdash

How to create a transient subscription to a stream, or all streams, and wait until an expected event is received.

Use Commanded.EventStore.subscribe/2 as shown in the following example.

alias Commanded.EventStore.RecordedEvent

defp receive_events(selector) do
  receive do
    {:events, events} ->
      case reduce(events, selector) do
        :cont -> receive_events(selector)
        {:halt, result} -> result
      end
  end
end

defp reduce([], _selector), do: :cont

defp reduce([%RecordedEvent{data: data} | events], selector) do
  case selector.(data) do
    :cont -> reduce(events, selector)
    {:halt, result} -> {:halt, result}
  end
end

Usage

:ok = Commanded.EventStore.subscribe(MyApp.App, stream_uuid)

receive_events(fn 
  %AnEvent{} = event -> {:halt, event}
  _event -> :cont
end)

Use :all as the stream identity to subscribe to all events.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions