Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions extra/lib/plausible/ingestion/analyzer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
defmodule Plausible.Ingestion.Analyzer do
@moduledoc """
Service and API for recording ingest requests of particular domains.
"""

use GenServer

import Ecto.Query, only: [from: 2]

alias Plausible.RateLimit
alias Plausible.Repo
alias __MODULE__

@max_rate 10
@rate_limit_key "ingestion_analyzer"
@refresh_interval :timer.seconds(1)
@flush_interval :timer.seconds(5)
@flush_threshold 2000
@log_retention_hours 24

def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

@impl true
def init(_) do
Analyzer.Sites =
:ets.new(Analyzer.Sites, [
:named_table,
:set,
:protected,
{:read_concurrency, true}
])

schedule_sites_refresh()
flush_timer = schedule_flush()

{:ok, %{active_sites?: false, buffer: [], flush_timer: flush_timer}}
end

def maybe_record(request, headers, drop_reason, now \\ NaiveDateTime.utc_now()) do
if get_active(request.domains) != [] and check_rate_limit() == :ok do
GenServer.cast(__MODULE__, {:record, request, headers, drop_reason, now})
else
:ok
end
end

def start_recording(domain, limit, now \\ NaiveDateTime.utc_now(:second)) do
domain
|> Analyzer.Site.create_changeset(limit, now)
|> Repo.insert(
on_conflict: [set: [updated_at: now, limit: limit]],
conflict_target: :domain,
returning: true
)
end

def stop_recording(domain) do
Repo.delete_all(from as in Analyzer.Site, where: as.domain == ^domain)
:ok
end

def purge_log(domain) do
Repo.delete_all(from l in Analyzer.Log, where: l.domain == ^domain)
:ok
end

def remove_old_logs(now \\ NaiveDateTime.utc_now()) do
cutoff_time = NaiveDateTime.shift(now, hour: -1 * @log_retention_hours)

Repo.delete_all(from l in Analyzer.Log, where: l.inserted_at < ^cutoff_time)
:ok
end

@impl true
def handle_cast({:record, request, headers, drop_reason, now}, state) do
if active_sites = state.active_sites? && get_active(request.domains) do
request_payload =
request
|> Map.from_struct()
|> Map.delete(:__meta__)
|> Map.delete(:domains)

headers_payload = Enum.group_by(headers, &elem(&1, 0), &elem(&1, 1))

entries =
Enum.map(active_sites, fn {domain, limit, updated_at} ->
:ets.insert(Analyzer.Sites, {domain, limit - 1, updated_at})

%{
domain: domain,
request: request_payload,
headers: headers_payload,
drop_reason: if(drop_reason, do: inspect(drop_reason)),
inserted_at: now
}
end)

new_buffer = entries ++ state.buffer

if length(new_buffer) >= @flush_threshold do
Process.cancel_timer(state.flush_timer)
flush(new_buffer)
schedule_flush()
{:noreply, %{state | buffer: []}}
else
{:noreply, %{state | buffer: new_buffer}}
end
else
{:noreply, state}
end
end

@impl true
def handle_info(:refresh_sites, state) do
now = NaiveDateTime.utc_now(:second)
active_sites? = refresh_sites(now)
schedule_sites_refresh()

{:noreply, %{state | active_sites?: active_sites?}}
end

def handle_info(:flush, state) do
Process.cancel_timer(state.flush_timer)
flush(state.buffer)
schedule_flush()

{:noreply, %{state | buffer: []}}
end

defp get_active(domains) do
Enum.reduce(domains, [], fn domain, active ->
case lookup_site(domain) do
{:ok, entry} -> [entry | active]
_ -> active
end
end)
end

defp check_rate_limit() do
case RateLimit.check_rate(@rate_limit_key, to_timeout(second: 1), @max_rate) do
{:allow, _} -> :ok
{:deny, _} -> {:error, :rate_limit}
end
end

defp lookup_site(domain) do
if :ets.whereis(Analyzer.Sites) != :undefined do
case :ets.lookup(Analyzer.Sites, domain) do
[{_, limit, _} = site] when limit > 0 ->
{:ok, site}

_ ->
{:error, :not_found}
end
else
{:error, :not_running}
end
end

defp schedule_sites_refresh() do
Process.send_after(self(), :refresh_sites, @refresh_interval)
end

defp schedule_flush() do
Process.send_after(self(), :flush, @flush_interval)
end

defp flush(buffer) do
Repo.insert_all(Analyzer.Log, buffer)
end

defp refresh_sites(now) do
cached_sites =
Analyzer.Sites
|> :ets.tab2list()
|> Map.new(fn {domain, limit, updated_at} ->
{domain, %{domain: domain, limit: limit, updated_at: updated_at}}
end)

site_domains =
from(as in Analyzer.Site,
where: as.valid_until > ^now,
select: %{domain: as.domain, limit: as.limit, updated_at: as.updated_at}
)
|> Repo.all()
|> Enum.map(fn site ->
cached = cached_sites[site.domain]

if is_nil(cached) or NaiveDateTime.compare(site.updated_at, cached.updated_at) == :gt do
site
else
cached
end
end)
|> Enum.map(fn site ->
:ets.insert(Analyzer.Sites, {site.domain, site.limit, site.updated_at})
site.domain
end)
|> MapSet.new()

Enum.each(cached_sites, fn {domain, _} ->
if not MapSet.member?(site_domains, domain) do
:ets.delete(Analyzer.Sites, domain)
end
end)

MapSet.size(site_domains) > 0
end
end
16 changes: 16 additions & 0 deletions extra/lib/plausible/ingestion/analyzer/log.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Plausible.Ingestion.Analyzer.Log do
@moduledoc """
Schema for site request analyzer
"""

use Ecto.Schema

schema "analyzer_logs" do
field :domain, :string
field :request, :map
field :headers, :map
field :drop_reason, :string

timestamps(updated_at: false, type: :naive_datetime_usec)
end
end
31 changes: 31 additions & 0 deletions extra/lib/plausible/ingestion/analyzer/site.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Plausible.Ingestion.Analyzer.Site do
@moduledoc """
Schema for site request analyzer
"""

use Ecto.Schema

import Ecto.Changeset

@valid_duration_seconds 3600
@max_limit 10_000

schema "analyzer_sites" do
field :domain, :string
field :limit, :integer
field :valid_until, :naive_datetime

timestamps()
end

def create_changeset(domain, limit, now) do
valid_until = NaiveDateTime.shift(now, second: @valid_duration_seconds)

%__MODULE__{}
|> cast(%{limit: limit}, [:limit])
|> put_change(:domain, domain)
|> put_change(:valid_until, valid_until)
|> validate_required(:limit)
|> validate_number(:limit, greater_than: 1, less_than_or_equal_to: @max_limit)
end
end
3 changes: 3 additions & 0 deletions lib/plausible/application.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
defmodule Plausible.Application do
@moduledoc false

Expand Down Expand Up @@ -152,6 +152,9 @@
{Phoenix.PubSub, name: Plausible.PubSub},
endpoint,
{Oban, Application.get_env(:plausible, Oban)},
on_ee do
Plausible.Ingestion.Analyzer
end,
on_ee do
help_scout_vault()
end
Expand Down
13 changes: 13 additions & 0 deletions lib/plausible_web/controllers/api/external_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ end

defmodule PlausibleWeb.Api.ExternalController do
use PlausibleWeb, :controller
use Plausible
require Logger

alias Plausible.Ingestion
Expand All @@ -15,6 +16,10 @@ defmodule PlausibleWeb.Api.ExternalController do
_ <- Sentry.Context.set_extra_context(%{request: request}) do
case Ingestion.Event.build_and_buffer(request) do
{:ok, %{dropped: [], buffered: _buffered}} ->
on_ee do
Ingestion.Analyzer.maybe_record(request, conn.req_headers, nil)
end

conn
|> put_status(202)
|> text("ok")
Expand All @@ -30,6 +35,14 @@ defmodule PlausibleWeb.Api.ExternalController do
errors: Plausible.ChangesetHelpers.traverse_errors(first_invalid_changeset)
})
else
on_ee do
Ingestion.Analyzer.maybe_record(
request,
conn.req_headers,
List.first(dropped).drop_reason
)
end

conn
|> put_resp_header("x-plausible-dropped", "#{Enum.count(dropped)}")
|> put_status(202)
Expand Down
30 changes: 30 additions & 0 deletions priv/repo/migrations/20250906195657_add_analyzer_schemas.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Plausible.Repo.Migrations.AddAnalyzerSchemas do
use Ecto.Migration

import Plausible.MigrationUtils

def change do
if enterprise_edition?() do
create table(:analyzer_sites) do
add :domain, :text, null: false
add :limit, :integer, null: false
add :valid_until, :naive_datetime, null: false

timestamps()
end

create unique_index(:analyzer_sites, [:domain])

create table(:analyzer_logs) do
add :domain, :text, null: false
add :request, :jsonb, null: false
add :headers, :jsonb, null: false
add :drop_reason, :text, null: true

timestamps(updated_at: false, type: :naive_datetime_usec)
end

create index(:analyzer_logs, [:domain])
end
end
end
Loading