Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions pkg/newlog/cmd/getKernelMsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,34 @@ import (
)

// getKernelMsg - goroutine to get from /dev/kmsg
func getKernelMsg(loggerChan chan inputEntry) {
// Writes to a dedicated kernelChan to decouple kernel log reading from
// the main pipeline. This ensures that backpressure from downstream
// processing (vector sockets, disk I/O, gzip compression) does not
// prevent reading /dev/kmsg, which would cause the kernel ring buffer
// to overflow and silently drop early messages.
func getKernelMsg(kernelChan chan inputEntry) {
parser, err := kmsgparser.NewParser()
if err != nil {
log.Fatalf("unable to create kmsg parser: %v", err)
}
defer parser.Close()

lastSeqNum := -1

kmsg := parser.Parse()
for msg := range kmsg {
// Detect gaps in kernel message sequence numbers.
// /dev/kmsg assigns a monotonically increasing sequence number
// to each message. A gap means the kernel ring buffer overflowed
// and messages were lost (EPIPE from the reader's perspective).
if lastSeqNum >= 0 && msg.SequenceNumber > lastSeqNum+1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need the check lastSeqNum >= 0

gap := uint64(msg.SequenceNumber - lastSeqNum - 1)
logmetrics.NumKmsgDropped += gap
log.Warnf("getKernelMsg: detected kernel log gap: %d messages lost (seq %d -> %d)",
gap, lastSeqNum, msg.SequenceNumber)
}
lastSeqNum = msg.SequenceNumber

entry := inputEntry{
source: "kernel",
severity: types.SyslogKernelDefaultLogLevel,
Expand All @@ -40,6 +59,18 @@ func getKernelMsg(loggerChan chan inputEntry) {
logmetrics.DevMetrics.NumInputEvent++
log.Tracef("getKmessages (%d) entry msg %s", logmetrics.NumKmessages, entry.content)

loggerChan <- entry
// Non-blocking send to the dedicated kernel buffer channel.
// If the kernel buffer is full (extremely unlikely with 500 slots),
// log the drop and count it — but never block, so we keep
// draining /dev/kmsg and prevent ring buffer overflow.
select {
case kernelChan <- entry:
default:
logmetrics.NumKmsgDropped++
log.Warnf("getKernelMsg: kernel buffer channel full, dropping message: %s", entry.content)
}
}
// If we get here, the kmsg parser channel was closed (read error or EOF).
// Log this as an error — no more kernel messages will be collected.
log.Errorf("getKernelMsg: kmsg parser channel closed, kernel log collection stopped")
}
35 changes: 28 additions & 7 deletions pkg/newlog/cmd/newlogd.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,24 @@ func main() {

log.Functionf("newlogd: starting... restarted %v", restarted)

loggerChan := make(chan inputEntry, 10)
movefileChan := make(chan fileChanInfo, 5)
loggerChan := make(chan inputEntry, 500)
movefileChan := make(chan fileChanInfo, 20)
panicFileChan := make(chan []byte, 2)

appLogChan := make(chan appLog)
uploadLogChan := make(chan string)
keepLogChan := make(chan string)
// Dedicated kernel message buffer. Kernel messages are rare but
// critical (OOM, hardware errors, panics). Under heavy load the
// main loggerChan can fill up because memlogd produces orders of
// magnitude more messages. While getKernelMsg blocks trying to
// send to a full loggerChan it stops reading /dev/kmsg, causing
// the kernel ring buffer to overflow and silently drop the
// earliest messages — exactly the ones needed to diagnose the
// problem. A large dedicated buffer lets getKernelMsg always
// drain /dev/kmsg independently of downstream backpressure.
kernelChan := make(chan inputEntry, 500)

appLogChan := make(chan appLog, 100)
uploadLogChan := make(chan string, 100)
keepLogChan := make(chan string, 100)

ps := *pubsub.New(&socketdriver.SocketDriver{Logger: logger, Log: log}, logger, log)

Expand Down Expand Up @@ -277,8 +288,18 @@ func main() {
// put the logs through vector before writing to logfiles
go sendLogsToVector(loggerChan, appLogChan, uploadLogChan, keepLogChan)

// handle the kernel messages
go getKernelMsg(loggerChan)
// handle the kernel messages into a dedicated buffer channel
go getKernelMsg(kernelChan)

// merge kernel messages from the dedicated buffer into the main
// pipeline. This goroutine may block on loggerChan when the
// downstream is slow, but the 500-slot kernelChan absorbs that
// delay so getKernelMsg never stops reading /dev/kmsg.
go func() {
for entry := range kernelChan {
loggerChan <- entry
}
}()

// handle collect other container log messages from memlogd
go getMemlogMsg(loggerChan, panicFileChan)
Expand Down
28 changes: 23 additions & 5 deletions pkg/newlog/cmd/writelogFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,23 +367,41 @@ func trigMoveToGzip(stats *statsLogFile, appUUID string, moveChan chan fileChanI
return
}

if err := stats.file.Close(); err != nil {
log.Fatal(err)
}

fileinfo := fileChanInfo{
isApp: appUUID != "",
inputSize: stats.size,
tmpfile: stats.file.Name(),
notUpload: stats.notUpload,
}

// Non-blocking send to moveChan. If the main goroutine is busy
// with gzip compression, moveChan may be full. In that case we
// must not block — blocking here freezes writelogFile, which
// blocks the downstream channels (appLogChan, uploadLogChan,
// keepLogChan), which blocks sendLogsToVector, which fills
// loggerChan, which blocks getKernelMsg from reading /dev/kmsg,
// causing the kernel ring buffer to overflow and silently drop
// the earliest messages.
// Instead, keep the current file open and let it grow beyond the
// size limit until the next trigger attempt succeeds.
select {
case moveChan <- fileinfo:
// Successfully queued for compression.
default:
log.Warnf("trigMoveToGzip: moveChan full, deferring gzip move for %s (size %d)",
stats.file.Name(), stats.size)
return
}

if err := stats.file.Close(); err != nil {
log.Fatal(err)
}

if timerTrig {
log.Function("Move log file ", stats.file.Name(), " to gzip. Size: ", stats.size, " Reason timer")
} else {
log.Function("Move log file ", stats.file.Name(), " to gzip. Size: ", stats.size, " Reason size")
}
moveChan <- fileinfo

if fileinfo.isApp { // appM stats and logfile is created when needed
delete(appStatsMap, appUUID)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading