Skip to content

Commit f8064f8

Browse files
committed
feat(traffic): add per-node traffic collection via nftables counters
Implement network node traffic monitoring using nftables inline counters in a separate table (system_control_stats) to avoid interference with NAT rules. Fix 30m+ interval queries that returned empty data because aggregated buckets didn't exist for recent time windows. - Add SetupNodeCounters/CollectNodeCounters/CleanupNodeCounters to traffic engine - Collect per-node traffic deltas in collector alongside interface stats - Auto-refresh nftables counters when nodes are created/updated/deleted - Combine raw + aggregated data in service queries to eliminate gaps - Move interval selector below header, make sidebar sticky
1 parent dd7e1f2 commit f8064f8

9 files changed

Lines changed: 360 additions & 48 deletions

File tree

cmd/server/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ func main() {
147147
// Serve frontend SPA
148148
r.Handle("/*", spaHandler())
149149

150+
// Wire traffic collector as node change listener
151+
nodesSvc.SetNodeChangeListener(trafficCollector)
152+
150153
// Start node monitoring
151154
nodesMonitor.Start()
152155

internal/network_nodes/service.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,32 @@ import (
66
"github.com/sokol/system-control/internal/pkg/validate"
77
)
88

9+
// NodeChangeListener is notified when nodes are created, updated, or deleted.
10+
type NodeChangeListener interface {
11+
RefreshNodes()
12+
}
13+
914
type Service struct {
10-
repo *Repository
11-
monitor *Monitor
15+
repo *Repository
16+
monitor *Monitor
17+
listener NodeChangeListener
1218
}
1319

1420
func NewService(repo *Repository, monitor *Monitor) *Service {
1521
return &Service{repo: repo, monitor: monitor}
1622
}
1723

24+
// SetNodeChangeListener registers a listener for node changes.
25+
func (s *Service) SetNodeChangeListener(l NodeChangeListener) {
26+
s.listener = l
27+
}
28+
29+
func (s *Service) notifyChange() {
30+
if s.listener != nil {
31+
go s.listener.RefreshNodes()
32+
}
33+
}
34+
1835
func (s *Service) GetStatuses() []NodeStatus {
1936
return s.monitor.GetStatuses()
2037
}
@@ -35,6 +52,7 @@ func (s *Service) Create(req CreateNodeRequest) (*NetworkNode, error) {
3552
if err := s.repo.Create(node); err != nil {
3653
return nil, fmt.Errorf("create node: %w", err)
3754
}
55+
s.notifyChange()
3856
return node, nil
3957
}
4058

@@ -50,9 +68,14 @@ func (s *Service) Update(id int64, req UpdateNodeRequest) (*NetworkNode, error)
5068
if err := s.repo.Update(node); err != nil {
5169
return nil, fmt.Errorf("update node: %w", err)
5270
}
71+
s.notifyChange()
5372
return node, nil
5473
}
5574

5675
func (s *Service) Delete(id int64) error {
57-
return s.repo.Delete(id)
76+
if err := s.repo.Delete(id); err != nil {
77+
return err
78+
}
79+
s.notifyChange()
80+
return nil
5881
}

internal/traffic/collector.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ type Collector struct {
1919

2020
// Previous counter values for computing deltas
2121
prevIface map[string]InterfaceSnapshot
22+
prevNode map[int64]NodeTrafficSnapshot
23+
nodesReady bool
2224
}
2325

2426
func NewCollector(repo *Repository, engine *Engine, nodesRepo *network_nodes.Repository, interfaces []string, interval time.Duration) *Collector {
@@ -30,6 +32,7 @@ func NewCollector(repo *Repository, engine *Engine, nodesRepo *network_nodes.Rep
3032
collectInterval: interval,
3133
stop: make(chan struct{}),
3234
prevIface: make(map[string]InterfaceSnapshot),
35+
prevNode: make(map[int64]NodeTrafficSnapshot),
3336
}
3437
}
3538

@@ -40,6 +43,7 @@ func (c *Collector) Start() {
4043

4144
func (c *Collector) Stop() {
4245
close(c.stop)
46+
_ = c.engine.CleanupNodeCounters()
4347
}
4448

4549
func (c *Collector) collectLoop() {
@@ -83,6 +87,71 @@ func (c *Collector) collect() {
8387
}
8488
c.prevIface[snap.Name] = snap
8589
}
90+
91+
// Setup node counters on first run
92+
if !c.nodesReady {
93+
c.setupNodes()
94+
}
95+
96+
// Collect node counters
97+
if c.nodesReady {
98+
c.collectNodes(now)
99+
}
100+
}
101+
102+
func (c *Collector) setupNodes() {
103+
nodes, err := c.nodesRepo.GetAll()
104+
if err != nil {
105+
slog.Error("traffic: failed to get nodes for counter setup", "error", err)
106+
return
107+
}
108+
109+
infos := make([]NodeInfo, len(nodes))
110+
for i, n := range nodes {
111+
infos[i] = NodeInfo{ID: n.ID, IP: n.IP}
112+
}
113+
114+
if err := c.engine.SetupNodeCounters(infos); err != nil {
115+
slog.Error("traffic: failed to setup node counters", "error", err)
116+
return
117+
}
118+
119+
c.nodesReady = true
120+
c.prevNode = make(map[int64]NodeTrafficSnapshot)
121+
}
122+
123+
func (c *Collector) collectNodes(now time.Time) {
124+
nodeSnaps, err := c.engine.CollectNodeCounters()
125+
if err != nil {
126+
slog.Error("traffic: failed to collect node counters", "error", err)
127+
return
128+
}
129+
130+
for _, snap := range nodeSnaps {
131+
prev, exists := c.prevNode[snap.NodeID]
132+
if exists && snap.BytesIn >= prev.BytesIn && snap.BytesOut >= prev.BytesOut {
133+
deltaIn := int64(snap.BytesIn - prev.BytesIn)
134+
deltaOut := int64(snap.BytesOut - prev.BytesOut)
135+
if deltaIn > 0 || deltaOut > 0 {
136+
nodeID := snap.NodeID
137+
if err := c.repo.InsertRaw(&nodeID, "", deltaIn, deltaOut, now); err != nil {
138+
slog.Error("traffic: failed to insert node data", "nodeId", snap.NodeID, "error", err)
139+
}
140+
}
141+
}
142+
c.prevNode[snap.NodeID] = snap
143+
}
144+
}
145+
146+
// RefreshNodes re-creates nftables counter rules for current nodes.
147+
// Called when nodes are added, updated, or deleted.
148+
func (c *Collector) RefreshNodes() {
149+
c.mu.Lock()
150+
defer c.mu.Unlock()
151+
152+
c.nodesReady = false
153+
c.prevNode = make(map[int64]NodeTrafficSnapshot)
154+
c.setupNodes()
86155
}
87156

88157
func (c *Collector) aggregateLoop() {

internal/traffic/engine_linux.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,179 @@ package traffic
55
import (
66
"bufio"
77
"fmt"
8+
"log/slog"
9+
"net"
810
"os"
11+
"strconv"
912
"strings"
13+
14+
"github.com/google/nftables"
15+
"github.com/google/nftables/expr"
1016
)
1117

18+
const statsTableName = "system_control_stats"
19+
20+
// SetupNodeCounters creates nftables counting rules for each node IP.
21+
// Uses a separate table to avoid interference with NAT rules.
22+
func (e *Engine) SetupNodeCounters(nodes []NodeInfo) error {
23+
conn, err := nftables.New()
24+
if err != nil {
25+
return fmt.Errorf("nftables connect: %w", err)
26+
}
27+
28+
// Delete existing stats table (ignore error if not exists)
29+
conn.DelTable(&nftables.Table{Name: statsTableName, Family: nftables.TableFamilyIPv4})
30+
_ = conn.Flush()
31+
32+
if len(nodes) == 0 {
33+
return nil
34+
}
35+
36+
conn, err = nftables.New()
37+
if err != nil {
38+
return fmt.Errorf("nftables connect: %w", err)
39+
}
40+
41+
table := conn.AddTable(&nftables.Table{
42+
Name: statsTableName,
43+
Family: nftables.TableFamilyIPv4,
44+
})
45+
46+
// Forward chain to count traffic passing through this host to/from nodes
47+
chain := conn.AddChain(&nftables.Chain{
48+
Name: "traffic_count",
49+
Table: table,
50+
Type: nftables.ChainTypeFilter,
51+
Hooknum: nftables.ChainHookForward,
52+
Priority: nftables.ChainPriorityFilter,
53+
})
54+
55+
for _, node := range nodes {
56+
ip := net.ParseIP(node.IP).To4()
57+
if ip == nil {
58+
slog.Error("traffic: invalid node IP, skipping", "nodeId", node.ID, "ip", node.IP)
59+
continue
60+
}
61+
62+
nodeIDStr := strconv.FormatInt(node.ID, 10)
63+
64+
// Rule: match dst IP = node.IP → count incoming traffic to node
65+
conn.AddRule(&nftables.Rule{
66+
Table: table,
67+
Chain: chain,
68+
Exprs: []expr.Any{
69+
&expr.Payload{DestRegister: 1, Base: expr.PayloadBaseNetworkHeader, Offset: 16, Len: 4},
70+
&expr.Cmp{Op: expr.CmpOpEq, Register: 1, Data: ip},
71+
&expr.Counter{},
72+
},
73+
UserData: []byte("node:" + nodeIDStr + ":in"),
74+
})
75+
76+
// Rule: match src IP = node.IP → count outgoing traffic from node
77+
conn.AddRule(&nftables.Rule{
78+
Table: table,
79+
Chain: chain,
80+
Exprs: []expr.Any{
81+
&expr.Payload{DestRegister: 1, Base: expr.PayloadBaseNetworkHeader, Offset: 12, Len: 4},
82+
&expr.Cmp{Op: expr.CmpOpEq, Register: 1, Data: ip},
83+
&expr.Counter{},
84+
},
85+
UserData: []byte("node:" + nodeIDStr + ":out"),
86+
})
87+
}
88+
89+
if err := conn.Flush(); err != nil {
90+
return fmt.Errorf("nftables flush stats: %w", err)
91+
}
92+
93+
slog.Info("traffic: node counters set up", "nodes", len(nodes))
94+
return nil
95+
}
96+
97+
// CollectNodeCounters reads nftables counter values for all node rules.
98+
func (e *Engine) CollectNodeCounters() ([]NodeTrafficSnapshot, error) {
99+
conn, err := nftables.New()
100+
if err != nil {
101+
return nil, fmt.Errorf("nftables connect: %w", err)
102+
}
103+
104+
table := &nftables.Table{Name: statsTableName, Family: nftables.TableFamilyIPv4}
105+
chain := &nftables.Chain{Name: "traffic_count", Table: table}
106+
107+
rules, err := conn.GetRules(table, chain)
108+
if err != nil {
109+
return nil, fmt.Errorf("get rules: %w", err)
110+
}
111+
112+
// Aggregate counters by node ID
113+
type counterPair struct {
114+
bytesIn uint64
115+
bytesOut uint64
116+
}
117+
counters := make(map[int64]*counterPair)
118+
119+
for _, rule := range rules {
120+
if len(rule.UserData) == 0 {
121+
continue
122+
}
123+
tag := string(rule.UserData)
124+
// Format: "node:{id}:{in|out}"
125+
parts := strings.SplitN(tag, ":", 3)
126+
if len(parts) != 3 || parts[0] != "node" {
127+
continue
128+
}
129+
130+
nodeID, err := strconv.ParseInt(parts[1], 10, 64)
131+
if err != nil {
132+
continue
133+
}
134+
direction := parts[2]
135+
136+
// Find counter expression in rule
137+
var bytes uint64
138+
for _, e := range rule.Exprs {
139+
if c, ok := e.(*expr.Counter); ok {
140+
bytes = c.Bytes
141+
break
142+
}
143+
}
144+
145+
pair, ok := counters[nodeID]
146+
if !ok {
147+
pair = &counterPair{}
148+
counters[nodeID] = pair
149+
}
150+
151+
switch direction {
152+
case "in":
153+
pair.bytesIn = bytes
154+
case "out":
155+
pair.bytesOut = bytes
156+
}
157+
}
158+
159+
results := make([]NodeTrafficSnapshot, 0, len(counters))
160+
for nodeID, pair := range counters {
161+
results = append(results, NodeTrafficSnapshot{
162+
NodeID: nodeID,
163+
BytesIn: pair.bytesIn,
164+
BytesOut: pair.bytesOut,
165+
})
166+
}
167+
168+
return results, nil
169+
}
170+
171+
// CleanupNodeCounters removes the stats table.
172+
func (e *Engine) CleanupNodeCounters() error {
173+
conn, err := nftables.New()
174+
if err != nil {
175+
return fmt.Errorf("nftables connect: %w", err)
176+
}
177+
conn.DelTable(&nftables.Table{Name: statsTableName, Family: nftables.TableFamilyIPv4})
178+
return conn.Flush()
179+
}
180+
12181
// CollectInterfaces reads /proc/net/dev and returns per-interface byte counters.
13182
func (e *Engine) CollectInterfaces(names []string) ([]InterfaceSnapshot, error) {
14183
f, err := os.Open("/proc/net/dev")

internal/traffic/engine_stub.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,20 @@ func (e *Engine) CollectInterfaces(names []string) ([]InterfaceSnapshot, error)
99
slog.Debug("traffic engine: interface collection is only supported on Linux")
1010
return nil, nil
1111
}
12+
13+
// SetupNodeCounters is a no-op on non-Linux platforms.
14+
func (e *Engine) SetupNodeCounters(nodes []NodeInfo) error {
15+
slog.Debug("traffic engine: node counters are only supported on Linux")
16+
return nil
17+
}
18+
19+
// CollectNodeCounters is a no-op on non-Linux platforms.
20+
func (e *Engine) CollectNodeCounters() ([]NodeTrafficSnapshot, error) {
21+
slog.Debug("traffic engine: node counters are only supported on Linux")
22+
return nil, nil
23+
}
24+
25+
// CleanupNodeCounters is a no-op on non-Linux platforms.
26+
func (e *Engine) CleanupNodeCounters() error {
27+
return nil
28+
}

internal/traffic/models.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,9 @@ type NodeTrafficSnapshot struct {
3030
BytesIn uint64
3131
BytesOut uint64
3232
}
33+
34+
// NodeInfo is a lightweight struct for setting up node counters.
35+
type NodeInfo struct {
36+
ID int64
37+
IP string
38+
}

0 commit comments

Comments
 (0)