Description
I've set up FLAME with a supervisor to spin up about 120 named runners:
defmodule Gambit.Supervisor do
use Supervisor
require Logger
def start_link(_) do
Supervisor.start_link(__MODULE__, nil, name: __MODULE__)
end
@impl true
def init(_) do
children =
Lang.all("stanza")
|> Enum.filter(& &1)
|> Enum.uniq()
|> Enum.map(fn code ->
{
FLAME.Pool,
name: Gambit.to_pool(code),
min: 0,
max: 2,
max_concurrency: 100,
idle_shutdown_after: {:timer.minutes(10), fn -> Gambit.Producer.clear?() end},
boot_timeout: :timer.minutes(5),
timeout: :timer.minutes(5),
log: :info,
on_grow_start: &IO.inspect(&1, label: "[FLAME | growing | #{code}]"),
on_grow_end: &IO.inspect(Map.put(&2, :status, &1), label: "[FLAME | success | #{code}]"),
on_shrink: &IO.inspect(&1, label: "[FLAME | closing | #{code}]")
}
end)
|> Enum.concat([
{Gambit.Producer, []},
{Gambit.Consumer, []}
])
|> Enum.filter(& &1)
Supervisor.init(children, strategy: :one_for_one)
end
end
Then added Gambit.Supervisor
to my application.ex
supervision tree.
I'm then using a Broadway processing pipeline (all with concurrency: 1
) and triggered 186 invocations. The max demand for the Broadway pipeline is 10.
Over the next 10 minutes, the messages were processed by broadway, and FLAME servers spun up. However, many more machines spun up than expected:
Fly configuration was pretty straightforward:
config :flame, :backend, FLAME.FlyBackend
config :flame, FLAME.FlyBackend,
token: fly_token,
cpus: 2,
cpu_kind: "shared",
memory_mb: 2048,
env: %{
"DATABASE_URL" => database_url,
"POOL_SIZE" => "1",
"SECRET_KEY_BASE" => secret_key_base
}
Am I missing something major here? Everything runs, and cleans up after 10 minutes of inactivity, so that's good. There seems to be an issue where the batching is not working — although I'm not sure if that's causing the FLAME issue, caused by the FLAME issue, or completed unrelated.
The FLAME call itself looks like:
def spanwords(text, language) do
lang = Lang.find(language)
pool = to_pool(lang["stanza"])
try do
FLAME.call(pool, fn ->
try do
Producer.spanwords(text, lang["xxx"])
catch
:exit, reason ->
Logger.error("[runner] " <> Exception.format_exit(reason))
{:error, {:exit, reason, :runner}}
end
end)
catch
:exit, reason ->
Logger.error("[caller] " <> Exception.format_exit(reason))
{:error, {:exit, reason, :caller}}
end
end
def to_pool(stanza) do
underscored = String.replace(stanza, ~r"[^a-zA-Z0-9]", "_")
String.to_atom("gambit_pool_#{underscored}")
end
Producer
is a GenStage Producer that accepts requests and stores them in internal state, then sends them in batches to a GenStage Consumer which handles them in batches, then replies back to the origin from pid.
My only guess is that the long reply is 'blocking' something in FLAME from seeing there is a child? Maybe? Except it does hold itself back, only creating 5 children, so it must know about them to some extent. And it seemed to create them over time, not all at once, so I'm definitely confused.
Another possibility maybe (seems more unlikely to me) — perhaps the String.to_atom is creating multiple atoms and they're then not sharing resources? I'm not sure how flame is using the name but String.to_atom("foo") == String.to_atom("foo")
so I don't think so. ¯\_(ツ)_/¯
Does this seem like expected behavior?
EDIT: It should be noted they all spun down ~10 minutes later