-
Notifications
You must be signed in to change notification settings - Fork 180
Expand file tree
/
Copy pathgetKernelMsg.go
More file actions
76 lines (66 loc) · 2.64 KB
/
getKernelMsg.go
File metadata and controls
76 lines (66 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Copyright (c) 2025 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"sync/atomic"
"time"
"github.com/euank/go-kmsg-parser/kmsgparser"
"github.com/lf-edge/eve/pkg/pillar/types"
)
// getKernelMsg - goroutine to get from /dev/kmsg
// Writes to a dedicated kernelChan to decouple kernel log reading from
// the main pipeline. This ensures that backpressure from downstream
// processing (vector sockets, disk I/O, gzip compression) does not
// prevent reading /dev/kmsg, which would cause the kernel ring buffer
// to overflow and silently drop early messages.
func getKernelMsg(kernelChan chan inputEntry) {
parser, err := kmsgparser.NewParser()
if err != nil {
log.Fatalf("unable to create kmsg parser: %v", err)
}
defer parser.Close()
lastSeqNum := -1
kmsg := parser.Parse()
for msg := range kmsg {
// Detect gaps in kernel message sequence numbers.
// /dev/kmsg assigns a monotonically increasing sequence number
// to each message. A gap means the kernel ring buffer overflowed
// and messages were lost (EPIPE from the reader's perspective).
if lastSeqNum >= 0 && msg.SequenceNumber > lastSeqNum+1 {
gap := uint64(msg.SequenceNumber - lastSeqNum - 1)
logmetrics.NumKmsgDropped += gap
log.Warnf("getKernelMsg: detected kernel log gap: %d messages lost (seq %d -> %d)",
gap, lastSeqNum, msg.SequenceNumber)
}
lastSeqNum = msg.SequenceNumber
entry := inputEntry{
source: "kernel",
severity: types.SyslogKernelDefaultLogLevel,
content: msg.Message,
timestamp: msg.Timestamp.Format(time.RFC3339Nano),
}
if msg.Priority >= 0 {
entry.severity = types.SyslogKernelLogLevelStr[msg.Priority%8]
}
if suppressMsg(entry, atomic.LoadUint32(&kernelPrio)) {
continue
}
entry.sendToRemote = types.SyslogKernelLogLevelNum[entry.severity] <= atomic.LoadUint32(&kernelRemotePrio)
logmetrics.NumKmessages++
logmetrics.DevMetrics.NumInputEvent++
log.Tracef("getKmessages (%d) entry msg %s", logmetrics.NumKmessages, entry.content)
// Non-blocking send to the dedicated kernel buffer channel.
// If the kernel buffer is full (extremely unlikely with 500 slots),
// log the drop and count it — but never block, so we keep
// draining /dev/kmsg and prevent ring buffer overflow.
select {
case kernelChan <- entry:
default:
logmetrics.NumKmsgDropped++
log.Warnf("getKernelMsg: kernel buffer channel full, dropping message: %s", entry.content)
}
}
// If we get here, the kmsg parser channel was closed (read error or EOF).
// Log this as an error — no more kernel messages will be collected.
log.Errorf("getKernelMsg: kmsg parser channel closed, kernel log collection stopped")
}