Skip to content

Commit f8562f8

Browse files
committed
Keep Shuttle poller state responsive
1 parent 073e918 commit f8562f8

3 files changed

Lines changed: 229 additions & 39 deletions

File tree

cmd/shuttle/status.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ var statusCmd = &cobra.Command{
4343
Use: "status",
4444
Short: "One-line-per-fiber status overview",
4545
Long: `Prints one line per fiber that has a shuttle: block.
46-
Sources: felt ls -j (for fibers with shuttle: blocks) + tmux ls (for live sessions).
46+
Sources: projected felt ls -j (for fibers with shuttle: blocks) + tmux ls (for live sessions).
4747
4848
Columns: fiber_id kind state agent next_due_at
4949
@@ -240,10 +240,13 @@ func listShuttleFibersAcrossHosts(hosts []string) ([]shuttleEntry, error) {
240240
return merged, nil
241241
}
242242

243-
// listShuttleFibers reads every fiber through `felt ls -s all --json` and
244-
// keeps the entries that carry a shuttle block.
243+
// listShuttleFibers reads a narrow projected felt listing and keeps the entries
244+
// that carry a shuttle block.
245245
func listShuttleFibers(host string) ([]shuttleEntry, error) {
246-
out, err := exec.Command("felt", "-C", host, "ls", "-s", "all", "--json").Output()
246+
out, err := projectedShuttleFiberListing(host)
247+
if err != nil {
248+
out, err = exec.Command("felt", "-C", host, "ls", "-s", "all", "--json").Output()
249+
}
247250
if err != nil {
248251
return nil, fmt.Errorf("felt ls: %w", err)
249252
}
@@ -279,6 +282,18 @@ func listShuttleFibers(host string) ([]shuttleEntry, error) {
279282
return entries, nil
280283
}
281284

285+
func projectedShuttleFiberListing(host string) ([]byte, error) {
286+
return exec.Command(
287+
"felt",
288+
"-C", host,
289+
"ls",
290+
"-s", "all",
291+
"--json",
292+
"--has-field", "shuttle",
293+
"--json-field", "id,shuttle",
294+
).Output()
295+
}
296+
282297
// liveTmuxSessions returns a set of tmux session names that start with "shuttle-".
283298
func liveTmuxSessions() map[string]bool {
284299
out, err := exec.Command("tmux", "ls", "-F", "#{session_name}").Output()

lib/shuttle/poller.ex

Lines changed: 132 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ defmodule Shuttle.Poller do
4848
@continuation_retry_delay_ms 1_000
4949
@failure_retry_base_ms 10_000
5050
@default_max_retry_backoff_ms 300_000
51+
@felt_shuttle_projection_fields "id,status,created_at,shuttle,depends_on,tempered"
5152

5253
defmodule State do
5354
@moduledoc false
@@ -80,6 +81,7 @@ defmodule Shuttle.Poller do
8081
waiters: %{},
8182
reservations: %{},
8283
completed_standing_runs: MapSet.new(),
84+
standing_roles: [],
8385
# %{fiber_id => felt_host} — populated by discover_candidates/1 on each
8486
# poll cycle and by host_for_fiber/2 on demand. Entries are never evicted
8587
# automatically; call bust_fiber_host_cache/1 when a fiber moves hosts.
@@ -250,8 +252,7 @@ defmodule Shuttle.Poller do
250252
when is_reference(tick_token) do
251253
state = %{
252254
state
253-
| poll_check_in_progress: true,
254-
next_poll_due_at_ms: nil,
255+
| next_poll_due_at_ms: nil,
255256
tick_timer_ref: nil,
256257
tick_token: nil
257258
}
@@ -262,9 +263,38 @@ defmodule Shuttle.Poller do
262263

263264
def handle_info({:tick, _}, state), do: {:noreply, state}
264265

266+
def handle_info(:run_poll_cycle, %{poll_check_in_progress: true} = state), do: {:noreply, state}
267+
265268
def handle_info(:run_poll_cycle, state) do
266-
state = maybe_dispatch(state)
267-
state = schedule_tick(state, state.poll_interval_ms)
269+
parent = self()
270+
271+
{:ok, _pid} =
272+
Task.start_link(fn ->
273+
send(parent, {:poll_cycle_complete, run_poll_cycle_safely(state)})
274+
end)
275+
276+
{:noreply, %{state | poll_check_in_progress: true}}
277+
end
278+
279+
def handle_info({:poll_cycle_complete, {:ok, poll_state}}, state) do
280+
state =
281+
state
282+
|> merge_poll_cycle_state(poll_state)
283+
|> Map.put(:poll_check_in_progress, false)
284+
|> schedule_tick(state.poll_interval_ms)
285+
286+
broadcast_snapshot(state)
287+
{:noreply, state}
288+
end
289+
290+
def handle_info({:poll_cycle_complete, {:error, reason}}, state) do
291+
Logger.error("Poll cycle failed: #{reason}")
292+
293+
state =
294+
state
295+
|> Map.put(:poll_check_in_progress, false)
296+
|> schedule_tick(state.poll_interval_ms)
297+
268298
broadcast_snapshot(state)
269299
{:noreply, state}
270300
end
@@ -516,7 +546,7 @@ defmodule Shuttle.Poller do
516546
blocked: blocked,
517547
orphans: [],
518548
retrying: retrying,
519-
standing_roles: standing_role_snapshots(state, now),
549+
standing_roles: standing_role_snapshots(state.standing_roles, state.running, now),
520550
claimed_count: MapSet.size(state.claimed),
521551
max_concurrent: state.max_concurrent_workers
522552
}
@@ -538,7 +568,11 @@ defmodule Shuttle.Poller do
538568
# Merge newly resolved host entries into the cache. Existing entries
539569
# are not evicted — earlier-configured hosts win for ID collisions,
540570
# and cache entries are stable for the daemon's lifetime.
541-
state = %{state | fiber_host_cache: Map.merge(new_host_map, state.fiber_host_cache)}
571+
state = %{
572+
state
573+
| fiber_host_cache: Map.merge(new_host_map, state.fiber_host_cache),
574+
standing_roles: standing_roles_from_candidates(candidates)
575+
}
542576

543577
eligible = candidates |> filter_eligible(state) |> sort_candidates()
544578
{deferred, dispatchable} = partition_deferred(eligible, state)
@@ -562,6 +596,30 @@ defmodule Shuttle.Poller do
562596
end
563597
end
564598

599+
defp run_poll_cycle_safely(%State{} = state) do
600+
{:ok, maybe_dispatch(state)}
601+
rescue
602+
error ->
603+
{:error, Exception.format(:error, error, __STACKTRACE__)}
604+
catch
605+
kind, reason ->
606+
{:error, Exception.format(kind, reason, __STACKTRACE__)}
607+
end
608+
609+
defp merge_poll_cycle_state(%State{} = current, %State{} = poll_state) do
610+
%{
611+
poll_state
612+
| running: Map.merge(poll_state.running, current.running),
613+
claimed: MapSet.union(poll_state.claimed, current.claimed),
614+
retry_queue: Map.merge(poll_state.retry_queue, current.retry_queue),
615+
waiters: current.waiters,
616+
reservations: current.reservations,
617+
completed_standing_runs:
618+
MapSet.union(poll_state.completed_standing_runs, current.completed_standing_runs),
619+
standing_roles: poll_state.standing_roles
620+
}
621+
end
622+
565623
# Splits eligible candidates into (deferred, dispatchable). Deferred
566624
# entries carry the origin claiming the fiber as running; the rest go
567625
# to the dispatch reduce. See "deferral logic only runs on the
@@ -683,9 +741,10 @@ defmodule Shuttle.Poller do
683741
# loom symlinking into a project-canonical host).
684742
#
685743
# Once we know which IDs are physically rooted in a host, felt becomes the
686-
# sole reader: `list_shuttle_fibers/3` shells out once per host to
687-
# `felt ls --json`, then filters that JSON payload down to the owned IDs
688-
# that actually carry a `shuttle:` block.
744+
# sole reader: `list_shuttle_fibers/3` shells out once per host to a narrow
745+
# `felt ls --json --has-field shuttle --json-field ...` projection, then
746+
# filters that JSON payload down to the owned IDs that actually carry a
747+
# `shuttle:` block.
689748
#
690749
# The `file_identity` MapSet below is belt-and-suspenders for esoteric
691750
# cases (hard links, etc.) where two physically-distinct paths point at
@@ -761,7 +820,7 @@ defmodule Shuttle.Poller do
761820
if MapSet.size(owned_ids) == 0 do
762821
{:ok, []}
763822
else
764-
case run_felt(host, state.runner, ["ls", "--json"]) do
823+
case run_felt_ls_for_shuttle(host, state) do
765824
{:ok, output} ->
766825
with {:ok, fibers} when is_list(fibers) <- Jason.decode(output) do
767826
kept =
@@ -781,6 +840,29 @@ defmodule Shuttle.Poller do
781840
end
782841
end
783842

843+
defp run_felt_ls_for_shuttle(host, state) do
844+
projected_args = [
845+
"ls",
846+
"--json",
847+
"--has-field",
848+
"shuttle",
849+
"--json-field",
850+
@felt_shuttle_projection_fields
851+
]
852+
853+
case run_felt(host, state.runner, projected_args) do
854+
{:ok, output} ->
855+
{:ok, output}
856+
857+
{:error, reason} ->
858+
Logger.warning(
859+
"projected felt ls failed for #{host}; falling back to legacy broad listing: #{inspect(reason)}"
860+
)
861+
862+
run_felt(host, state.runner, ["ls", "--json"])
863+
end
864+
end
865+
784866
# `(major_device, inode)` from `File.stat` (follows symlinks) uniquely
785867
# identifies a physical file regardless of which symlink path you used
786868
# to reach it. Returns nil on stat failure so the caller can fall back
@@ -1038,6 +1120,27 @@ defmodule Shuttle.Poller do
10381120
end
10391121
end
10401122

1123+
defp standing_roles_from_candidates(candidates) do
1124+
Enum.flat_map(candidates, fn fiber ->
1125+
case standing_role_from_fiber(fiber) do
1126+
{:ok, role} ->
1127+
if StandingRole.standing?(role), do: [role], else: []
1128+
1129+
{:error, _} ->
1130+
[]
1131+
end
1132+
end)
1133+
end
1134+
1135+
defp standing_role_from_fiber(fiber) do
1136+
fiber_id = Map.get(fiber, "id", "")
1137+
1138+
case Map.get(fiber, "shuttle") do
1139+
shuttle when is_map(shuttle) -> StandingRole.from_map(fiber_id, shuttle)
1140+
_ -> {:error, :no_shuttle_block}
1141+
end
1142+
end
1143+
10411144
# Resolves which configured felt host owns `fiber_id`.
10421145
#
10431146
# Resolution order:
@@ -1144,10 +1247,16 @@ defmodule Shuttle.Poller do
11441247
if deps == [] or is_nil(deps) do
11451248
true
11461249
else
1147-
Enum.all?(deps, fn dep_id ->
1148-
case fetch_fiber_full(dep_id, state) do
1149-
{:ok, dep} -> Map.get(dep, "tempered", false) == true
1150-
{:error, _} -> false
1250+
Enum.all?(deps, fn dep ->
1251+
case dep_id(dep) do
1252+
nil ->
1253+
false
1254+
1255+
dep_id ->
1256+
case fetch_fiber_full(dep_id, state) do
1257+
{:ok, dep} -> Map.get(dep, "tempered", false) == true
1258+
{:error, _} -> false
1259+
end
11511260
end
11521261
end)
11531262
end
@@ -1157,6 +1266,11 @@ defmodule Shuttle.Poller do
11571266
end
11581267
end
11591268

1269+
defp dep_id(dep) when is_binary(dep), do: dep
1270+
defp dep_id(%{"id" => id}) when is_binary(id), do: id
1271+
defp dep_id(%{id: id}) when is_binary(id), do: id
1272+
defp dep_id(_), do: nil
1273+
11601274
defp sort_candidates(candidates) do
11611275
Enum.sort_by(candidates, fn fiber ->
11621276
created = Map.get(fiber, "created_at", "")
@@ -1698,26 +1812,10 @@ defmodule Shuttle.Poller do
16981812
MapSet.member?(state.completed_standing_runs, {fiber_id, run_id})
16991813
end
17001814

1701-
defp standing_role_snapshots(state, now) do
1702-
with {:ok, candidates, _host_map} <- discover_candidates(state) do
1703-
candidates
1704-
|> Enum.filter(fn fiber ->
1705-
standing_role?(fiber, state)
1706-
end)
1707-
|> Enum.flat_map(fn fiber ->
1708-
fiber_id = Map.get(fiber, "id", "")
1709-
1710-
case fetch_standing_role(fiber_id, state) do
1711-
{:ok, role} ->
1712-
[StandingRole.to_snapshot(role, now, Map.has_key?(state.running, fiber_id))]
1713-
1714-
{:error, _} ->
1715-
[]
1716-
end
1717-
end)
1718-
else
1719-
_ -> []
1720-
end
1815+
defp standing_role_snapshots(roles, running, now) do
1816+
Enum.map(roles, fn role ->
1817+
StandingRole.to_snapshot(role, now, Map.has_key?(running, role.fiber_id))
1818+
end)
17211819
end
17221820

17231821
# Run a felt CLI command against an explicit host directory.

0 commit comments

Comments
 (0)