diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index 761c65f8..73e2b4ab 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -328,7 +328,10 @@ defmodule Supavisor.ClientHandler do idle_timeout: opts.idle_timeout } - Registry.register(@clients_registry, data.id, started_at: System.monotonic_time()) + Registry.register(@clients_registry, data.id, + started_at: System.monotonic_time(), + app_name: data.app_name + ) cond do data.client_ready -> @@ -354,7 +357,7 @@ defmodule Supavisor.ClientHandler do case Supavisor.get_pool_ranch(data.id) do {:ok, pool_ranch} -> Logger.metadata(proxy: true) - Registry.register(@proxy_clients_registry, data.id, []) + Registry.register(@proxy_clients_registry, data.id, app_name: data.app_name) {:keep_state, %{data | pool_ranch: pool_ranch}, {:next_event, :internal, :connect_db}} @@ -903,7 +906,7 @@ defmodule Supavisor.ClientHandler do else: :auth_query connection_params = %Supavisor.ConnectionParameters{ - application_name: data.app_name || "Supavisor", + application_name: data.app_name, database: db_name, host: to_charlist(info.tenant.db_host), sni_hostname: diff --git a/lib/supavisor/client_handler/protocol_helpers.ex b/lib/supavisor/client_handler/protocol_helpers.ex index e5d7cd89..27ee25a2 100644 --- a/lib/supavisor/client_handler/protocol_helpers.ex +++ b/lib/supavisor/client_handler/protocol_helpers.ex @@ -113,15 +113,15 @@ defmodule Supavisor.ClientHandler.ProtocolHelpers do @doc """ Normalizes application name from client connection. - Returns sanitized string or default "Supavisor" for invalid names. + Returns sanitized string or default "" for missing/invalid names. """ @spec normalize_app_name(any()) :: String.t() def normalize_app_name(name) when is_binary(name), do: name - def normalize_app_name(nil), do: "Supavisor" + def normalize_app_name(nil), do: "" def normalize_app_name(name) do Logger.debug("ClientHandler: Invalid application name #{inspect(name)}") - "Supavisor" + "" end @doc """ diff --git a/lib/supavisor/db_handler.ex b/lib/supavisor/db_handler.ex index 8165c501..0f9b25ae 100644 --- a/lib/supavisor/db_handler.ex +++ b/lib/supavisor/db_handler.ex @@ -166,15 +166,23 @@ defmodule Supavisor.DbHandler do %{} -> {args.id, Supavisor.Manager.get_config(args.id)} end + proxy = Map.get(config, :proxy, false) + Helpers.set_log_level(config.log_level) Helpers.set_max_heap_size(90) - Logger.metadata(project: config.tenant, user: config.user, mode: config.mode) + conn_params = + if proxy do + config.connection_params + else + %ConnectionParameters{config.connection_params | application_name: "Supavisor"} + end + data = %{ id: id, sock: nil, - connection_params: config.connection_params, + connection_params: conn_params, user: config.user, tenant: config.tenant, tenant_feature_flags: config.tenant_feature_flags, @@ -184,7 +192,7 @@ defmodule Supavisor.DbHandler do server_proof: nil, stats: %{}, prepared_statements: MapSet.new(), - proxy: Map.get(config, :proxy, false), + proxy: proxy, client_tls: Map.get(config, :client_tls), client_jit: Map.get(config, :client_jit), stream_state: MessageStreamer.new_stream_state(BackendMessageHandler), diff --git a/lib/supavisor/monitoring/tenant.ex b/lib/supavisor/monitoring/tenant.ex index 05fd556f..83bc0097 100644 --- a/lib/supavisor/monitoring/tenant.ex +++ b/lib/supavisor/monitoring/tenant.ex @@ -248,28 +248,35 @@ defmodule Supavisor.PromEx.Plugins.Tenant do event_name: [:supavisor, :connections], description: "The total count of active clients for a tenant.", measurement: :active, - tags: @tags + tags: @tags ++ [:app_name] ) ] ) end def execute_tenant_metrics do - Registry.select(Supavisor.Registry.TenantClients, [{{:"$1", :_, :_}, [], [:"$1"]}]) - |> Enum.frequencies_by(&Supavisor.id(&1, upstream_tls: false)) - |> Enum.each(&emit_telemetry_for_tenant/1) + Supavisor.Registry.TenantClients + |> Registry.select([{{:"$1", :_, :"$2"}, [], [{{:"$1", :"$2"}}]}]) + |> Enum.frequencies_by(fn {id, meta} -> + {Supavisor.id(id, upstream_tls: false), meta[:app_name] || ""} + end) + |> Enum.each(fn {{id, app_name}, count} -> + emit_telemetry_for_tenant(id, count, app_name) + end) end - @spec emit_telemetry_for_tenant({S.id(), non_neg_integer()}) :: :ok + @spec emit_telemetry_for_tenant(S.id(), non_neg_integer(), String.t()) :: :ok def emit_telemetry_for_tenant( - {Supavisor.id( - type: type, - tenant: tenant, - user: user, - mode: mode, - db: db_name, - search_path: search_path - ), count} + Supavisor.id( + type: type, + tenant: tenant, + user: user, + mode: mode, + db: db_name, + search_path: search_path + ), + count, + app_name ) do :telemetry.execute( [:supavisor, :connections], @@ -280,7 +287,8 @@ defmodule Supavisor.PromEx.Plugins.Tenant do mode: mode, type: type, db_name: db_name, - search_path: search_path + search_path: search_path, + app_name: app_name } ) end @@ -296,7 +304,7 @@ defmodule Supavisor.PromEx.Plugins.Tenant do event_name: [:supavisor, :client, :connection, :lifetime], measurement: :lifetime, description: "How long the client connection has been alive.", - tags: @tags, + tags: @tags ++ [:app_name], unit: {:native, :millisecond}, reporter_options: [ peep_bucket_calculator: ClientConnectionLifetimeBuckets @@ -342,7 +350,8 @@ defmodule Supavisor.PromEx.Plugins.Tenant do mode: mode, type: type, db_name: db_name, - search_path: search_path + search_path: search_path, + app_name: meta[:app_name] || "" } ) end @@ -359,28 +368,35 @@ defmodule Supavisor.PromEx.Plugins.Tenant do event_name: [:supavisor, :proxy, :connections], description: "The total count of active proxy clients for a tenant.", measurement: :active, - tags: @tags + tags: @tags ++ [:app_name] ) ] ) end def execute_tenant_proxy_metrics do - Registry.select(Supavisor.Registry.TenantProxyClients, [{{:"$1", :_, :_}, [], [:"$1"]}]) - |> Enum.frequencies_by(&Supavisor.id(&1, upstream_tls: false)) - |> Enum.each(&emit_proxy_telemetry_for_tenant/1) + Supavisor.Registry.TenantProxyClients + |> Registry.select([{{:"$1", :_, :"$2"}, [], [{{:"$1", :"$2"}}]}]) + |> Enum.frequencies_by(fn {id, meta} -> + {Supavisor.id(id, upstream_tls: false), meta[:app_name] || ""} + end) + |> Enum.each(fn {{id, app_name}, count} -> + emit_proxy_telemetry_for_tenant(id, count, app_name) + end) end - @spec emit_proxy_telemetry_for_tenant({S.id(), non_neg_integer()}) :: :ok + @spec emit_proxy_telemetry_for_tenant(S.id(), non_neg_integer(), String.t()) :: :ok def emit_proxy_telemetry_for_tenant( - {Supavisor.id( - type: type, - tenant: tenant, - user: user, - mode: mode, - db: db_name, - search_path: search_path - ), count} + Supavisor.id( + type: type, + tenant: tenant, + user: user, + mode: mode, + db: db_name, + search_path: search_path + ), + count, + app_name ) do :telemetry.execute( [:supavisor, :proxy, :connections], @@ -391,7 +407,8 @@ defmodule Supavisor.PromEx.Plugins.Tenant do mode: mode, type: type, db_name: db_name, - search_path: search_path + search_path: search_path, + app_name: app_name } ) end diff --git a/test/integration/proxy_test.exs b/test/integration/proxy_test.exs index 3470937f..3956a828 100644 --- a/test/integration/proxy_test.exs +++ b/test/integration/proxy_test.exs @@ -4,6 +4,7 @@ defmodule Supavisor.Integration.ProxyTest do require Logger require Supavisor + alias Ecto.Adapters.SQL.Sandbox alias Postgrex, as: P alias Supavisor.Support.{Cluster, SSLHelper} @@ -135,6 +136,177 @@ defmodule Supavisor.Integration.ProxyTest do end end + @tag cluster: true + test "app_name is set correctly for the proxy connection" do + db_conf = Application.get_env(:supavisor, Supavisor.Repo) + username = db_conf[:username] + + tenant = + Sandbox.unboxed_run(Supavisor.Repo, fn -> + random_suffix = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower) + tenant_id = "app_name_test_#{System.unique_integer([:positive])}_#{random_suffix}" + + {:ok, _} = + Supavisor.Tenants.create_tenant(%{ + db_host: to_string(db_conf[:hostname]), + db_port: db_conf[:port], + db_database: db_conf[:database], + default_parameter_status: %{}, + external_id: tenant_id, + require_user: true, + availability_zone: "ap-southeast-1c", + users: [ + %{ + "db_user" => db_conf[:username], + "db_password" => db_conf[:password], + "pool_size" => 3, + "mode_type" => "transaction" + } + ] + }) + + on_exit(fn -> Supavisor.Tenants.delete_tenant_by_external_id(tenant_id) end) + tenant_id + end) + + id = + Supavisor.id( + type: :single, + tenant: tenant, + user: username, + mode: :transaction, + db: db_conf[:database] + ) + + assert {:ok, _pid, node2} = Cluster.start_node() + Node.connect(node2) + + {:ok, conn} = + Postgrex.start_link( + hostname: db_conf[:hostname], + port: Application.get_env(:supavisor, :proxy_port_transaction), + database: db_conf[:database], + password: db_conf[:password], + username: "#{username}.#{tenant}", + parameters: [application_name: "my_test_app"] + ) + + # Upstream DB connection should always identify as "Supavisor" + assert %P.Result{rows: [["Supavisor"]]} = + P.query!(conn, "SELECT current_setting('application_name')", []) + + # node2 is the pool node + :ok = + wait_until(fn -> + :rpc.call(node2, Registry, :lookup, [ + Supavisor.Registry.TenantClients, + id + ]) != [] + end) + + [{_pid, proxy_meta}] = + :rpc.call(node2, Registry, :lookup, [ + Supavisor.Registry.TenantClients, + id + ]) + + assert proxy_meta[:app_name] == "my_test_app" + + # our node is the proxy node + :ok = + wait_until(fn -> + Registry.lookup(Supavisor.Registry.TenantProxyClients, id) != [] + end) + + [{_pid, pool_meta}] = Registry.lookup(Supavisor.Registry.TenantProxyClients, id) + assert pool_meta[:app_name] == "my_test_app" + end + + @tag cluster: true + test "app_name defaults to empty string when not provided" do + db_conf = Application.get_env(:supavisor, Supavisor.Repo) + username = db_conf[:username] + + tenant = + Sandbox.unboxed_run(Supavisor.Repo, fn -> + random_suffix = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower) + tenant_id = "app_name_empty_#{System.unique_integer([:positive])}_#{random_suffix}" + + {:ok, _} = + Supavisor.Tenants.create_tenant(%{ + db_host: to_string(db_conf[:hostname]), + db_port: db_conf[:port], + db_database: db_conf[:database], + default_parameter_status: %{}, + external_id: tenant_id, + require_user: true, + availability_zone: "ap-southeast-1c", + users: [ + %{ + "db_user" => db_conf[:username], + "db_password" => db_conf[:password], + "pool_size" => 3, + "mode_type" => "transaction" + } + ] + }) + + on_exit(fn -> Supavisor.Tenants.delete_tenant_by_external_id(tenant_id) end) + tenant_id + end) + + id = + Supavisor.id( + type: :single, + tenant: tenant, + user: username, + mode: :transaction, + db: db_conf[:database] + ) + + assert {:ok, _pid, node2} = Cluster.start_node() + Node.connect(node2) + + {:ok, conn} = + Postgrex.start_link( + hostname: db_conf[:hostname], + port: Application.get_env(:supavisor, :proxy_port_transaction), + database: db_conf[:database], + password: db_conf[:password], + username: "#{username}.#{tenant}" + ) + + # Upstream DB connection should always identify as "Supavisor" + assert %P.Result{rows: [["Supavisor"]]} = + P.query!(conn, "SELECT current_setting('application_name')", []) + + # node2 is the pool node + :ok = + wait_until(fn -> + :rpc.call(node2, Registry, :lookup, [ + Supavisor.Registry.TenantClients, + id + ]) != [] + end) + + [{_pid, proxy_meta}] = + :rpc.call(node2, Registry, :lookup, [ + Supavisor.Registry.TenantClients, + id + ]) + + assert proxy_meta[:app_name] == "" + + # our node is the proxy node + :ok = + wait_until(fn -> + Registry.lookup(Supavisor.Registry.TenantProxyClients, id) != [] + end) + + [{_pid, pool_meta}] = Registry.lookup(Supavisor.Registry.TenantProxyClients, id) + assert pool_meta[:app_name] == "" + end + for tenant <- @tenants do test "select with #{tenant}" do %{proxy: proxy, origin: origin} = setup_tenant_connections(unquote(tenant)) diff --git a/test/supavisor/client_handler_test.exs b/test/supavisor/client_handler_test.exs index a2a176b6..60b74633 100644 --- a/test/supavisor/client_handler_test.exs +++ b/test/supavisor/client_handler_test.exs @@ -142,7 +142,7 @@ defmodule Supavisor.ClientHandlerTest do data = %{sock: {:gen_tcp, :fake_port}, id: "test", app_name: nil} - assert {:keep_state, %{app_name: "Supavisor"}, + assert {:keep_state, %{app_name: ""}, {:next_event, :internal, {:hello, {:single, {"postgres", "dev_tenant", "postgres", nil, false, nil}}}}} = @subject.handle_event(:info, {:tcp, :fake_port, bin}, :handshake, data) diff --git a/test/supavisor/metrics_cleaner_test.exs b/test/supavisor/metrics_cleaner_test.exs index 86006878..722eeb59 100644 --- a/test/supavisor/metrics_cleaner_test.exs +++ b/test/supavisor/metrics_cleaner_test.exs @@ -23,13 +23,15 @@ defmodule Supavisor.MetricsCleanerTest do test "metrics for unknown tenant are removed" do :ok = Metrics.emit_telemetry_for_tenant( - {Supavisor.id( - type: :single, - tenant: "non-existent", - user: "foo", - mode: :transaction, - db: "bar" - ), 2137} + Supavisor.id( + type: :single, + tenant: "non-existent", + user: "foo", + mode: :transaction, + db: "bar" + ), + 2137, + "" ) metrics = Supavisor.Monitoring.PromEx.get_metrics() @@ -112,13 +114,15 @@ defmodule Supavisor.MetricsCleanerTest do :ok = Metrics.emit_telemetry_for_tenant( - {Supavisor.id( - type: :single, - tenant: "orphan-tags-test", - user: "foo", - mode: :transaction, - db: "bar" - ), 42} + Supavisor.id( + type: :single, + tenant: "orphan-tags-test", + user: "foo", + mode: :transaction, + db: "bar" + ), + 42, + "" ) # Trigger a scrape so the cache_tid gets populated via the export path @@ -132,7 +136,8 @@ defmodule Supavisor.MetricsCleanerTest do mode: :transaction, type: :single, db_name: "bar", - search_path: nil + search_path: nil, + app_name: "" } [{^tags_map, tags_id}] = :ets.lookup(tags_tid, tags_map) diff --git a/test/supavisor/monitoring/tenant_test.exs b/test/supavisor/monitoring/tenant_test.exs index fe714458..06ca93aa 100644 --- a/test/supavisor/monitoring/tenant_test.exs +++ b/test/supavisor/monitoring/tenant_test.exs @@ -104,7 +104,8 @@ defmodule Supavisor.PromEx.Plugins.TenantTest do mode: :transaction, type: :single, db_name: ctx.db, - search_path: nil + search_path: nil, + app_name: "" } end end @@ -129,7 +130,7 @@ defmodule Supavisor.PromEx.Plugins.TenantTest do start_supervised!( {Task, fn -> - Registry.register(Supavisor.Registry.TenantClients, id, []) + Registry.register(Supavisor.Registry.TenantClients, id, app_name: "myapp") Process.sleep(:infinity) end}, id: :"client_#{i}" @@ -147,12 +148,64 @@ defmodule Supavisor.PromEx.Plugins.TenantTest do mode: :transaction, type: :single, db_name: "test_db", - search_path: nil + search_path: nil, + app_name: "myapp" } refute_receive {^ref, {[:supavisor, :connections], %{active: 3}, _}} refute_receive {^ref, {[:supavisor, :connections], %{active: 2}, _}} end + + test "groups clients by app_name" do + id = + Supavisor.id( + type: :single, + tenant: "metrics_app_name_test", + user: "test_user", + mode: :transaction, + db: "test_db", + search_path: nil, + upstream_tls: false + ) + + clients = [ + {id, [app_name: "webapp"]}, + {id, [app_name: "webapp"]}, + {id, [app_name: "worker"]}, + {id, [app_name: ""]} + ] + + for {{reg_id, meta}, i} <- Enum.with_index(clients) do + start_supervised!( + {Task, + fn -> + Registry.register(Supavisor.Registry.TenantClients, reg_id, meta) + Process.sleep(:infinity) + end}, + id: :"app_name_client_#{i}" + ) + end + + ref = attach_handler([:supavisor, :connections]) + Tenant.execute_tenant_metrics() + + events = + Enum.reduce_while(1..10, [], fn _, acc -> + receive do + {^ref, {[:supavisor, :connections], measurement, meta}} -> + {:cont, [{measurement, meta} | acc]} + after + 100 -> {:halt, acc} + end + end) + + assert length(events) == 3 + + by_app = Map.new(events, fn {%{active: count}, %{app_name: app}} -> {app, count} end) + assert by_app["webapp"] == 2 + assert by_app["worker"] == 1 + assert by_app[""] == 1 + end end describe "execute_tenant_proxy_metrics/0" do @@ -174,7 +227,7 @@ defmodule Supavisor.PromEx.Plugins.TenantTest do start_supervised!( {Task, fn -> - Registry.register(Supavisor.Registry.TenantProxyClients, id, []) + Registry.register(Supavisor.Registry.TenantProxyClients, id, app_name: "proxyapp") Process.sleep(:infinity) end}, id: :"proxy_client_#{i}" @@ -192,7 +245,8 @@ defmodule Supavisor.PromEx.Plugins.TenantTest do mode: :transaction, type: :single, db_name: "test_db", - search_path: nil + search_path: nil, + app_name: "proxyapp" } refute_receive {^ref, {[:supavisor, :proxy, :connections], %{active: 2}, _}}