Skip to content

Commit e2f212c

Browse files
committed
yangerd: Change containers from polled to reactive
1 parent e316833 commit e2f212c

3 files changed

Lines changed: 283 additions & 13 deletions

File tree

src/yangerd/cmd/yangerd/main.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/kernelkit/infix/src/yangerd/internal/bridgebatch"
1818
"github.com/kernelkit/infix/src/yangerd/internal/collector"
1919
"github.com/kernelkit/infix/src/yangerd/internal/config"
20+
"github.com/kernelkit/infix/src/yangerd/internal/containermonitor"
2021
"github.com/kernelkit/infix/src/yangerd/internal/dbusmonitor"
2122
"github.com/kernelkit/infix/src/yangerd/internal/ethmonitor"
2223
"github.com/kernelkit/infix/src/yangerd/internal/frrvty"
@@ -235,6 +236,17 @@ func main() {
235236
}()
236237
}
237238

239+
if cfg.EnableContainers {
240+
ctrmon := containermonitor.New(t, cmd, fs, slogLog)
241+
wg.Add(1)
242+
go func() {
243+
defer wg.Done()
244+
if err := ctrmon.Run(ctx); err != nil && ctx.Err() == nil {
245+
slogLog.Error("containermonitor exited", "err", err)
246+
}
247+
}()
248+
}
249+
238250
zapi := zapiwatcher.New(t, frrvty.New(""), slogLog)
239251
wg.Add(1)
240252
go func() {
@@ -338,19 +350,8 @@ func main() {
338350
slogLog.Warn("fswatcher dns watch failed", "path", path, "err", err)
339351
}
340352
}
341-
if cfg.EnableContainers {
342-
containerHandler := fswatcher.WatchHandler{
343-
TreeKey: "infix-containers:containers",
344-
ReadFunc: func(_ string) (json.RawMessage, error) {
345-
return collector.CollectContainers(cmd, fs), nil
346-
},
347-
Debounce: 500 * time.Millisecond,
348-
}
349-
os.MkdirAll("/run/libpod/events", 0755)
350-
if err := fsw.WatchDir("/run/libpod/events", containerHandler); err != nil {
351-
slogLog.Warn("fswatcher container watch failed", "err", err)
352-
}
353-
}
353+
// Container operational data is handled by containermonitor (a
354+
// `podman events` stream), not the fswatcher.
354355
fsw.InitialRead()
355356
wg.Add(1)
356357
go func() {
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Package containermonitor keeps the infix-containers subtree in the tree
2+
// in sync with podman. A persistent `podman events` subprocess is used
3+
// purely as a change trigger; on every event the full container table is
4+
// re-read with `podman ps` (via collector.CollectContainers) and the
5+
// subtree replaced, so removed containers disappear and containers present
6+
// before yangerd started are picked up.
7+
//
8+
// This replaces an earlier inotify watch on /run/libpod/events, which was
9+
// reactive-only and silently went stale whenever an event was missed
10+
// (debounce coalescing, inotify overflow, a removal racing the re-read, or
11+
// yangerd starting after the container). `podman events` reads whichever
12+
// events backend podman is configured for (file or journald), so it does
13+
// not depend on a specific on-disk layout.
14+
package containermonitor
15+
16+
import (
17+
"bufio"
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"io"
22+
"log/slog"
23+
"os/exec"
24+
"time"
25+
26+
"github.com/kernelkit/infix/src/yangerd/internal/backoff"
27+
"github.com/kernelkit/infix/src/yangerd/internal/collector"
28+
"github.com/kernelkit/infix/src/yangerd/internal/tree"
29+
)
30+
31+
const (
32+
treeKey = "infix-containers:containers"
33+
34+
// debounceDelay coalesces bursts of events into one re-read.
35+
debounceDelay = 200 * time.Millisecond
36+
)
37+
38+
// ContainerMonitor subscribes to container lifecycle events via a
39+
// persistent `podman events` subprocess and re-reads the full container
40+
// table on every event.
41+
type ContainerMonitor struct {
42+
tree *tree.Tree
43+
log *slog.Logger
44+
refresh chan struct{}
45+
46+
// collect returns the current container subtree, or nil when there are
47+
// no containers; overridable in tests.
48+
collect func() json.RawMessage
49+
}
50+
51+
// New creates a ContainerMonitor.
52+
func New(t *tree.Tree, cmd collector.CommandRunner, fs collector.FileReader, log *slog.Logger) *ContainerMonitor {
53+
if log == nil {
54+
log = slog.Default()
55+
}
56+
return &ContainerMonitor{
57+
tree: t,
58+
log: log,
59+
refresh: make(chan struct{}, 1),
60+
collect: func() json.RawMessage { return collector.CollectContainers(cmd, fs) },
61+
}
62+
}
63+
64+
// Run starts the container monitor. It blocks until ctx is cancelled,
65+
// restarting the events subprocess with backoff if it exits.
66+
func (m *ContainerMonitor) Run(ctx context.Context) error {
67+
go m.refreshLoop(ctx)
68+
69+
bo := backoff.Default()
70+
delay := bo.Initial
71+
72+
for {
73+
err := m.runOnce(ctx)
74+
if ctx.Err() != nil {
75+
return ctx.Err()
76+
}
77+
78+
m.log.Warn("container monitor: subprocess exited, restarting",
79+
"err", err, "delay", delay)
80+
if err := backoff.Sleep(ctx, delay); err != nil {
81+
return err
82+
}
83+
delay = bo.Next(delay)
84+
}
85+
}
86+
87+
func (m *ContainerMonitor) runOnce(ctx context.Context) error {
88+
cmd := exec.CommandContext(ctx, "podman", "events", "--filter", "type=container", "--format", "json")
89+
stdout, err := cmd.StdoutPipe()
90+
if err != nil {
91+
return fmt.Errorf("stdout pipe: %w", err)
92+
}
93+
if err := cmd.Start(); err != nil {
94+
return fmt.Errorf("start podman events: %w", err)
95+
}
96+
defer cmd.Wait()
97+
98+
// Pick up containers that existed before we attached.
99+
m.triggerRefresh()
100+
101+
return m.readEvents(stdout)
102+
}
103+
104+
// readEvents consumes the newline-delimited JSON event stream. Each event
105+
// is only a trigger; the payload is never used to build state.
106+
func (m *ContainerMonitor) readEvents(r io.Reader) error {
107+
scanner := bufio.NewScanner(r)
108+
scanner.Buffer(make([]byte, 0, 64*1024), 1*1024*1024)
109+
110+
for scanner.Scan() {
111+
line := scanner.Bytes()
112+
if len(line) == 0 {
113+
continue
114+
}
115+
if status := eventStatus(line); status != "" {
116+
m.log.Debug("container monitor: event", "status", status)
117+
}
118+
m.triggerRefresh()
119+
}
120+
if err := scanner.Err(); err != nil {
121+
return fmt.Errorf("read podman events: %w", err)
122+
}
123+
return fmt.Errorf("podman events process exited")
124+
}
125+
126+
// eventStatus extracts the event status for logging; best-effort only.
127+
func eventStatus(line []byte) string {
128+
var ev struct {
129+
Status string `json:"Status"`
130+
}
131+
if json.Unmarshal(line, &ev) != nil {
132+
return ""
133+
}
134+
return ev.Status
135+
}
136+
137+
// triggerRefresh requests a table re-read; the buffered channel collapses
138+
// pending requests into one.
139+
func (m *ContainerMonitor) triggerRefresh() {
140+
select {
141+
case m.refresh <- struct{}{}:
142+
default:
143+
}
144+
}
145+
146+
func (m *ContainerMonitor) refreshLoop(ctx context.Context) {
147+
for {
148+
select {
149+
case <-ctx.Done():
150+
return
151+
case <-m.refresh:
152+
}
153+
154+
// Let a burst of events settle before reading.
155+
select {
156+
case <-ctx.Done():
157+
return
158+
case <-time.After(debounceDelay):
159+
}
160+
select {
161+
case <-m.refresh:
162+
default:
163+
}
164+
165+
m.updateTree()
166+
}
167+
}
168+
169+
// updateTree re-reads the full container table and replaces the subtree.
170+
// With no containers the key is deleted rather than left as an empty node,
171+
// so an idle-but-enabled container feature reads as absent.
172+
func (m *ContainerMonitor) updateTree() {
173+
data := m.collect()
174+
if len(data) == 0 {
175+
m.tree.Delete(treeKey)
176+
m.log.Debug("container monitor: no containers, key removed")
177+
return
178+
}
179+
m.tree.Set(treeKey, data)
180+
m.log.Debug("container monitor: tree updated")
181+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package containermonitor
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"github.com/kernelkit/infix/src/yangerd/internal/tree"
11+
)
12+
13+
// newTestMonitor builds a monitor whose collect() is driven by the test.
14+
// cmd/fs are nil since collect is overridden, so the default closure that
15+
// would use them is never called.
16+
func newTestMonitor(t *testing.T, collect func() json.RawMessage) (*ContainerMonitor, *tree.Tree) {
17+
t.Helper()
18+
tr := tree.New()
19+
m := New(tr, nil, nil, nil)
20+
m.collect = collect
21+
return m, tr
22+
}
23+
24+
func TestUpdateTreeSetsContainers(t *testing.T) {
25+
m, tr := newTestMonitor(t, func() json.RawMessage {
26+
return json.RawMessage(`{"container":[{"name":"web"}]}`)
27+
})
28+
29+
m.updateTree()
30+
31+
got := tr.Get(treeKey)
32+
if got == nil || !strings.Contains(string(got), "web") {
33+
t.Fatalf("expected container data, got %s", got)
34+
}
35+
}
36+
37+
// With no containers the key must be deleted, not left as an empty node,
38+
// so an idle-but-enabled container feature reads as absent.
39+
func TestUpdateTreeDeletesWhenEmpty(t *testing.T) {
40+
m, tr := newTestMonitor(t, func() json.RawMessage { return nil })
41+
42+
tr.Set(treeKey, json.RawMessage(`{"container":[{"name":"old"}]}`))
43+
m.updateTree()
44+
45+
if got := tr.Get(treeKey); got != nil {
46+
t.Fatalf("expected key removed when no containers, got %s", got)
47+
}
48+
}
49+
50+
// An event in the stream must trigger a re-read; here the re-read clears a
51+
// previously-present container, proving the stream drives reconciliation.
52+
func TestEventTriggersRefresh(t *testing.T) {
53+
calls := 0
54+
m, tr := newTestMonitor(t, func() json.RawMessage {
55+
calls++
56+
return nil // container is gone
57+
})
58+
tr.Set(treeKey, json.RawMessage(`{"container":[{"name":"gone"}]}`))
59+
60+
ctx, cancel := context.WithCancel(context.Background())
61+
defer cancel()
62+
go m.refreshLoop(ctx)
63+
64+
// A container "died" event, newline-framed as podman emits it.
65+
go m.readEvents(strings.NewReader(`{"Type":"container","Status":"died","Name":"gone"}` + "\n"))
66+
67+
deadline := time.After(2 * time.Second)
68+
for {
69+
if tr.Get(treeKey) == nil && calls > 0 {
70+
break
71+
}
72+
select {
73+
case <-deadline:
74+
t.Fatalf("event did not trigger reconcile; calls=%d tree=%s", calls, tr.Get(treeKey))
75+
default:
76+
time.Sleep(10 * time.Millisecond)
77+
}
78+
}
79+
}
80+
81+
func TestEventStatus(t *testing.T) {
82+
if s := eventStatus([]byte(`{"Status":"start"}`)); s != "start" {
83+
t.Errorf("eventStatus = %q, want start", s)
84+
}
85+
if s := eventStatus([]byte(`not json`)); s != "" {
86+
t.Errorf("eventStatus on garbage = %q, want empty", s)
87+
}
88+
}

0 commit comments

Comments
 (0)