Skip to content

Commit 8783061

Browse files
committed
add [:phoenix, :socket_drain] telemetry event
and also use it for logging drain events
1 parent 6443bd5 commit 8783061

File tree

6 files changed

+54
-13
lines changed

6 files changed

+54
-13
lines changed

installer/templates/phx_web/telemetry.ex

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ defmodule <%= @web_namespace %>.Telemetry do
4343
summary("phoenix.socket_connected.duration",
4444
unit: {:native, :millisecond}
4545
),
46+
sum("phoenix.socket_drain.count"),
4647
summary("phoenix.channel_joined.duration",
4748
unit: {:native, :millisecond}
4849
),

lib/phoenix/endpoint.ex

+2
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,8 @@ defmodule Phoenix.Endpoint do
784784
batch to terminate. Defaults to 2000ms.
785785
* `:shutdown` - The maximum amount of time in milliseconds allowed
786786
to drain all batches. Defaults to 30000ms.
787+
* `:log` - the log level for drain actions. Defaults the `:log` option
788+
passed to `use Phoenix.Socket` or `:info`. Set it to `false` to disable logging.
787789
788790
For example, if you have 150k connections, the default values will
789791
split them into 15 batches of 10k connections. Each batch takes

lib/phoenix/logger.ex

+22
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ defmodule Phoenix.Logger do
5858
* Metadata: `%{endpoint: atom, transport: atom, params: term, connect_info: map, vsn: binary, user_socket: atom, result: :ok | :error, serializer: atom, log: Logger.level | false}`
5959
* Disable logging: `use Phoenix.Socket, log: false` or `socket "/foo", MySocket, websocket: [log: false]` in your endpoint
6060
61+
* `[:phoenix, :socket_drain]` - dispatched by `Phoenix.Socket` when using the `:drainer` option
62+
* Measurement: `%{count: integer, total: integer, index: integer, rounds: integer}`
63+
* Metadata: `%{endpoint: atom, socket: atom, intervasl: integer, log: Logger.level | false}`
64+
* Disable logging: `use Phoenix.Socket, log: false` in your endpoint or pass `:log` option in the `:drainer` option
65+
6166
* `[:phoenix, :channel_joined]` - dispatched at the end of a channel join
6267
* Measurement: `%{duration: native_time}`
6368
* Metadata: `%{result: :ok | :error, params: term, socket: Phoenix.Socket.t}`
@@ -134,6 +139,7 @@ defmodule Phoenix.Logger do
134139
[:phoenix, :router_dispatch, :start] => &__MODULE__.phoenix_router_dispatch_start/4,
135140
[:phoenix, :error_rendered] => &__MODULE__.phoenix_error_rendered/4,
136141
[:phoenix, :socket_connected] => &__MODULE__.phoenix_socket_connected/4,
142+
[:phoenix, :socket_drain] => &__MODULE__.phoenix_socket_drain/4,
137143
[:phoenix, :channel_joined] => &__MODULE__.phoenix_channel_joined/4,
138144
[:phoenix, :channel_handled_in] => &__MODULE__.phoenix_channel_handled_in/4
139145
}
@@ -339,6 +345,22 @@ defmodule Phoenix.Logger do
339345
defp connect_result(:ok), do: "CONNECTED TO "
340346
defp connect_result(:error), do: "REFUSED CONNECTION TO "
341347

348+
@doc false
349+
def phoenix_socket_drain(_, _, %{log: false}, _), do: :ok
350+
351+
def phoenix_socket_drain(_, %{count: count, total: total, index: index, rounds: rounds}, %{log: level} = meta, _) do
352+
Logger.log(level, fn ->
353+
%{socket: socket, interval: interval} = meta
354+
355+
[
356+
"DRAINING #{count} of #{total} total connection(s) for socket ",
357+
inspect(socket),
358+
" every #{interval}ms - ",
359+
"round #{index} of #{rounds}"
360+
]
361+
end)
362+
end
363+
342364
## Event: [:phoenix, :channel_joined]
343365

344366
@doc false

lib/phoenix/socket.ex

+5-1
Original file line numberDiff line numberDiff line change
@@ -464,10 +464,14 @@ defmodule Phoenix.Socket do
464464
case drainer do
465465
{module, function, arguments} ->
466466
apply(module, function, arguments)
467+
467468
_ ->
468469
drainer
469470
end
470-
{Phoenix.Socket.PoolDrainer, {endpoint, handler, drainer}}
471+
472+
opts = Keyword.merge(opts, drainer: drainer)
473+
474+
{Phoenix.Socket.PoolDrainer, {endpoint, handler, opts}}
471475
else
472476
:ignore
473477
end

lib/phoenix/socket/pool_supervisor.ex

+19-9
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ defmodule Phoenix.Socket.PoolDrainer do
7575
%{
7676
id: {:terminator, name},
7777
start: {__MODULE__, :start_link, [tuple]},
78-
shutdown: Keyword.get(opts, :shutdown, 30_000)
78+
shutdown: Keyword.get(opts[:drainer], :shutdown, 30_000)
7979
}
8080
end
8181

@@ -86,13 +86,14 @@ defmodule Phoenix.Socket.PoolDrainer do
8686
@impl true
8787
def init({endpoint, name, opts}) do
8888
Process.flag(:trap_exit, true)
89-
size = Keyword.get(opts, :batch_size, 10_000)
90-
interval = Keyword.get(opts, :batch_interval, 2_000)
91-
{:ok, {endpoint, name, size, interval}}
89+
size = Keyword.get(opts[:drainer], :batch_size, 10_000)
90+
interval = Keyword.get(opts[:drainer], :batch_interval, 2_000)
91+
log_level = Keyword.get(opts[:drainer], :log, opts[:log] || :info)
92+
{:ok, {endpoint, name, size, interval, log_level}}
9293
end
9394

9495
@impl true
95-
def terminate(_reason, {endpoint, name, size, interval}) do
96+
def terminate(_reason, {endpoint, name, size, interval, log_level}) do
9697
ets = endpoint.config({:socket, name})
9798
partitions = :ets.lookup_element(ets, :partitions, 2)
9899

@@ -109,12 +110,21 @@ defmodule Phoenix.Socket.PoolDrainer do
109110

110111
rounds = div(total, size) + 1
111112

112-
if total != 0 do
113-
Logger.info("Shutting down #{total} sockets in #{rounds} rounds of #{interval}ms")
114-
end
115-
116113
for {pids, index} <-
117114
collection |> Stream.concat() |> Stream.chunk_every(size) |> Stream.with_index(1) do
115+
count = if index == rounds, do: length(pids), else: size
116+
117+
:telemetry.execute(
118+
[:phoenix, :socket_drain],
119+
%{count: count, total: total, index: index, rounds: rounds},
120+
%{
121+
endpoint: endpoint,
122+
socket: name,
123+
interval: interval,
124+
log: log_level
125+
}
126+
)
127+
118128
spawn(fn ->
119129
for pid <- pids do
120130
send(pid, %Phoenix.Socket.Broadcast{event: "phx_drain"})

test/phoenix/socket/socket_test.exs

+5-3
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ defmodule Phoenix.SocketTest do
7676
test "merges keyword lists" do
7777
socket = %Phoenix.Socket{}
7878
socket = assign(socket, %{foo: :bar, abc: :def})
79-
socket = assign(socket, [foo: :baz])
79+
socket = assign(socket, foo: :baz)
8080
assert socket.assigns[:foo] == :baz
8181
assert socket.assigns[:abc] == :def
8282
end
@@ -109,7 +109,8 @@ defmodule Phoenix.SocketTest do
109109
]
110110

111111
assert DrainerSpecSocket.drainer_spec(drainer: drainer_spec, endpoint: Endpoint) ==
112-
{Phoenix.Socket.PoolDrainer, {Endpoint, DrainerSpecSocket, drainer_spec}}
112+
{Phoenix.Socket.PoolDrainer,
113+
{Endpoint, DrainerSpecSocket, [endpoint: Endpoint, drainer: drainer_spec]}}
113114
end
114115

115116
test "loads dynamic drainer config" do
@@ -119,7 +120,8 @@ defmodule Phoenix.SocketTest do
119120
drainer: {DrainerSpecSocket, :dynamic_drainer_config, []},
120121
endpoint: Endpoint
121122
) ==
122-
{Phoenix.Socket.PoolDrainer, {Endpoint, DrainerSpecSocket, drainer_spec}}
123+
{Phoenix.Socket.PoolDrainer,
124+
{Endpoint, DrainerSpecSocket, [endpoint: Endpoint, drainer: drainer_spec]}}
123125
end
124126

125127
test "returns ignore if drainer is set to false" do

0 commit comments

Comments
 (0)