|
| 1 | +defmodule AshDynamo.DataLayer.Query.Paginator do |
| 2 | + @moduledoc """ |
| 3 | + Handles DynamoDB pagination via `LastEvaluatedKey` and `ExclusiveStartKey`. |
| 4 | +
|
| 5 | + DynamoDB returns at most 1MB of data per request. When more items are available, |
| 6 | + the response includes a `LastEvaluatedKey` that must be passed as `ExclusiveStartKey` |
| 7 | + in the next request. This module encapsulates that loop, accumulating pages until |
| 8 | + the requested limit is reached or no more data is available. |
| 9 | + """ |
| 10 | + |
| 11 | + @doc """ |
| 12 | + Fetches items from DynamoDB, handling pagination via `LastEvaluatedKey`/`ExclusiveStartKey`. |
| 13 | +
|
| 14 | + Returns a merged response map with the same shape as a single ExAws response: |
| 15 | + `%{"Items" => [...], "Count" => N, "ScannedCount" => N}`. |
| 16 | + """ |
| 17 | + def fetch(table, mode, base_opts, limit \\ nil) do |
| 18 | + do_fetch(table, mode, base_opts, limit, _acc = nil, _start_key = nil) |
| 19 | + end |
| 20 | + |
| 21 | + # Accumulator is `nil` for the first request and `{pages, count, scanned}` |
| 22 | + # for subsequent ones. Each page's items list is prepended as a whole (O(1)), |
| 23 | + # then reversed and concatenated in `to_response`. |
| 24 | + defp do_fetch(table, mode, base_opts, limit, acc, start_key) do |
| 25 | + # We always delegate limiting to DynamoDB by passing the remaining count |
| 26 | + # as the Limit parameter. On the first request this equals the original limit. |
| 27 | + # On subsequent requests it is reduced by the number of items already accumulated. |
| 28 | + # This avoids over-fetching when the 1MB page boundary causes DynamoDB to return |
| 29 | + # fewer items than requested, requiring additional pages. |
| 30 | + remaining = remaining_limit(limit, acc) |
| 31 | + |
| 32 | + page_opts = |
| 33 | + base_opts |
| 34 | + |> maybe_put(:limit, remaining) |
| 35 | + |> maybe_put(:exclusive_start_key, start_key) |
| 36 | + |
| 37 | + result = |
| 38 | + mode |
| 39 | + |> case do |
| 40 | + :query -> ExAws.Dynamo.query(table, page_opts) |
| 41 | + :scan -> ExAws.Dynamo.scan(table, page_opts) |
| 42 | + end |
| 43 | + |> ExAws.request() |
| 44 | + |
| 45 | + with {:ok, resp} <- result do |
| 46 | + merged = accumulate(acc, resp) |
| 47 | + |
| 48 | + cond do |
| 49 | + limit != nil and item_count(merged) >= limit -> |
| 50 | + {:ok, to_response(merged)} |
| 51 | + |
| 52 | + Map.has_key?(resp, "LastEvaluatedKey") -> |
| 53 | + do_fetch(table, mode, base_opts, limit, merged, resp["LastEvaluatedKey"]) |
| 54 | + |
| 55 | + true -> |
| 56 | + {:ok, to_response(merged)} |
| 57 | + end |
| 58 | + end |
| 59 | + end |
| 60 | + |
| 61 | + defp remaining_limit(nil, _acc), do: nil |
| 62 | + defp remaining_limit(limit, nil), do: limit |
| 63 | + defp remaining_limit(limit, {_pages, count, _scanned}), do: limit - count |
| 64 | + |
| 65 | + defp accumulate(nil, resp) do |
| 66 | + items = resp["Items"] || [] |
| 67 | + {[items], resp["Count"] || 0, resp["ScannedCount"] || 0} |
| 68 | + end |
| 69 | + |
| 70 | + defp accumulate({pages, count, scanned}, resp) do |
| 71 | + items = resp["Items"] || [] |
| 72 | + |
| 73 | + { |
| 74 | + [items | pages], |
| 75 | + count + (resp["Count"] || 0), |
| 76 | + scanned + (resp["ScannedCount"] || 0) |
| 77 | + } |
| 78 | + end |
| 79 | + |
| 80 | + # Pages are prepended during accumulation (O(1) per page), so we |
| 81 | + # reverse the page order and concatenate into a flat item list. |
| 82 | + defp to_response({pages, count, scanned}) do |
| 83 | + items = pages |> Enum.reverse() |> Enum.concat() |
| 84 | + %{"Items" => items, "Count" => count, "ScannedCount" => scanned} |
| 85 | + end |
| 86 | + |
| 87 | + defp item_count({_pages, count, _scanned}), do: count |
| 88 | + |
| 89 | + defp maybe_put(opts, _key, nil), do: opts |
| 90 | + defp maybe_put(opts, key, value), do: Keyword.put(opts, key, value) |
| 91 | +end |
0 commit comments