Skip to content

Commit ca20b7d

Browse files
authored
Document the Pager module and add support for from_datetime (#216)
* Document the Pager module and add support for from_datetime * mix format
1 parent a2ea72a commit ca20b7d

File tree

2 files changed

+80
-5
lines changed

2 files changed

+80
-5
lines changed

lib/gnat/jetstream/pager.ex

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,37 @@
11
defmodule Gnat.Jetstream.Pager do
2-
@moduledoc false
2+
@moduledoc """
3+
Page through all the messages in a stream
4+
5+
This module provides a synchronous API to inspect the messages in a stream.
6+
You can use the reduce module to write a simple function that works like `Enum.reduce` across each message individually.
7+
If you want to handle messages in batches, you can use the `init` + `page` functions to accomplish that.
8+
"""
39

410
alias Gnat.Jetstream
511
alias Gnat.Jetstream.API.{Consumer, Util}
612

713
@opaque pager :: map()
814
@type message :: Gnat.message()
9-
15+
@type opts :: list(opt())
16+
17+
@typedoc """
18+
Options you can pass to the pager
19+
20+
* `batch` controls the maximum number of messages we'll pull in each page/batch (default 10)
21+
* `domain` You can specify a jetstream domain if needed
22+
* `from_datetime` Only page through messages recorded on or after this datetime
23+
* `from_seq` Only page through messages with a sequence number equal or above this option
24+
* `headers_only` You can pass `true` to this if you only want to see the headers from each message. Can be useful to get metadata without having to receieve large body payloads.
25+
26+
"""
27+
@type opt ::
28+
{:batch, non_neg_integer()}
29+
| {:domain, String.t()}
30+
| {:from_datetime, DateTime.t()}
31+
| {:from_seq, non_neg_integer}
32+
| {:headers_only, boolean()}
33+
34+
@spec init(Gnat.t(), String.t(), opts()) :: {:ok, pager()} | {:error, term()}
1035
def init(conn, stream_name, opts) do
1136
domain = Keyword.get(opts, :domainl)
1237

@@ -55,6 +80,26 @@ defmodule Gnat.Jetstream.Pager do
5580
end
5681
end
5782

83+
@doc """
84+
Similar to Enum.reduce but you can iterate through all messages in a stream
85+
86+
```
87+
# Assume we have a stream with messages like "1", "2", ... "10"
88+
Gnat.Jetstream.Pager.reduce(:gnat, "NUMBERS_STREAM", [batch_size: 5], 0, fn(message, total) ->
89+
num = String.to_integer(message.body)
90+
total + num
91+
end)
92+
93+
# => {:ok, 55}
94+
```
95+
"""
96+
@spec reduce(
97+
Gnat.t(),
98+
String.t(),
99+
opts(),
100+
Enum.acc(),
101+
(Gnat.message(), Enum.acc() -> Enum.acc())
102+
) :: {:ok, Enum.acc()} | {:error, term()}
58103
def reduce(conn, stream_name, opts, initial_state, fun) do
59104
with {:ok, pager} <- init(conn, stream_name, opts) do
60105
page_through(pager, initial_state, fun)
@@ -109,12 +154,17 @@ defmodule Gnat.Jetstream.Pager do
109154

110155
## Helpers for accepting user options
111156
defp apply_opts_to_consumer(consumer = %Consumer{}, opts) do
112-
case Keyword.get(opts, :from_seq) do
113-
nil ->
157+
from = {Keyword.get(opts, :from_seq), Keyword.get(opts, :from_datetime)}
158+
159+
case from do
160+
{nil, nil} ->
114161
consumer
115162

116-
seq when is_integer(seq) ->
163+
{seq, _} when is_integer(seq) ->
117164
%Consumer{consumer | deliver_policy: :by_start_sequence, opt_start_seq: seq}
165+
166+
{_, %DateTime{} = dt} ->
167+
%Consumer{consumer | deliver_policy: :by_start_time, opt_start_time: dt}
118168
end
119169
end
120170
end

test/jetstream/pager_test.exs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,31 @@ defmodule Gnat.Jetstream.PagerTest do
2929
Stream.delete(:gnat, "pager_a")
3030
end
3131

32+
test "paging from a datetime" do
33+
{:ok, _stream} = create_stream("pager_b")
34+
35+
Enum.each(1..50, fn i ->
36+
:ok = Gnat.pub(:gnat, "input.pager_b", "#{i}")
37+
end)
38+
39+
timestamp = DateTime.utc_now()
40+
41+
Enum.each(51..100, fn i ->
42+
:ok = Gnat.pub(:gnat, "input.pager_b", "#{i}")
43+
end)
44+
45+
{:ok, res} =
46+
Pager.reduce(:gnat, "pager_b", [from_datetime: timestamp], 0, fn msg, total ->
47+
total + String.to_integer(msg.body)
48+
end)
49+
50+
# The datetime isn't exactly synced between nats and the client
51+
# so we use a pretty fuzzy check here. This test is mostly to
52+
# provide coverage for accepting the option
53+
assert res <= 5050
54+
assert res >= 3775
55+
end
56+
3257
defp create_stream(name) do
3358
stream = %Stream{
3459
name: name,

0 commit comments

Comments
 (0)