Skip to content

Commit 0afec8f

Browse files
committed
Align multiple audit logs
1 parent 05d85c4 commit 0afec8f

File tree

2 files changed

+50
-24
lines changed

2 files changed

+50
-24
lines changed

cmd/kube-apiserver-audit-exporter/main.go

+46-20
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,56 @@ func init() {
2727
pflag.Parse()
2828
}
2929

30-
func main() {
31-
go func() {
32-
if delay > 0 {
33-
time.Sleep(delay)
30+
func monitorAndStartExporters() {
31+
for !validAuditLogs() {
32+
time.Sleep(time.Second)
33+
}
34+
35+
if delay > 0 {
36+
time.Sleep(delay)
37+
}
38+
39+
for _, path := range auditLogPath {
40+
startExporterForPath(path)
41+
}
42+
}
43+
44+
func validAuditLogs() bool {
45+
for _, p := range auditLogPath {
46+
info, err := os.Stat(p)
47+
if err != nil {
48+
slog.Warn("Failed to stat audit log", "path", p, "err", err)
49+
return false
3450
}
35-
for _, p := range auditLogPath {
36-
ns := strings.SplitN(p, ":", 2)
37-
path := ns[0]
38-
clusterLabel := cluster
39-
if len(ns) > 1 {
40-
clusterLabel = ns[1]
41-
}
42-
43-
e := exporter.NewExporter(
44-
exporter.WithReplay(replay),
45-
exporter.WithFile(path),
46-
exporter.WithClusterLabel(clusterLabel),
47-
)
48-
e.Start()
51+
if info.Size() == 0 {
52+
slog.Info("Audit log is empty, waiting for content", "path", p)
53+
return false
4954
}
50-
}()
55+
}
56+
return true
57+
}
58+
59+
func startExporterForPath(pathWithLabel string) {
60+
parts := strings.SplitN(pathWithLabel, ":", 2)
61+
path := parts[0]
62+
clusterLabel := cluster
63+
if len(parts) > 1 {
64+
clusterLabel = parts[1]
65+
}
66+
67+
e := exporter.NewExporter(
68+
exporter.WithReplay(replay),
69+
exporter.WithFile(path),
70+
exporter.WithClusterLabel(clusterLabel),
71+
)
72+
e.Start()
73+
}
74+
75+
func main() {
76+
go monitorAndStartExporters()
5177

5278
if err := exporter.ListenAndServe(address); err != nil {
53-
slog.Error("Failed", "err", err)
79+
slog.Error("Failed to start metrics server", "err", err)
5480
os.Exit(1)
5581
}
5682
}

exporter/exporter.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (p *Exporter) processAuditEvents() {
9090
// handleFileEvent processes filesystem events
9191
func (p *Exporter) handleFileEvent(path string) {
9292
if err := p.processFileUpdate(path); err != nil {
93-
slog.Error("Error processing file", "error", err)
93+
slog.Error("Error processing file", "cluster", p.clusterLabel, "error", err)
9494
}
9595
}
9696

@@ -102,10 +102,10 @@ func (p *Exporter) processFileUpdate(path string) error {
102102
}
103103

104104
if size := fileInfo.Size(); size < p.offset {
105-
slog.Info("Log file truncated, resetting offset")
105+
slog.Info("Log file truncated, resetting offset", "cluster", p.clusterLabel)
106106
p.offset = 0
107107
} else if size == p.offset {
108-
slog.Info("No new updates in log file", "offset", p.offset)
108+
slog.Info("No new updates in log file", "cluster", p.clusterLabel, "offset", p.offset)
109109
return nil
110110
}
111111

@@ -121,7 +121,7 @@ func (p *Exporter) processFileUpdate(path string) error {
121121

122122
start := time.Now()
123123
defer func() {
124-
slog.Info("File processing complete", "new_offset", p.offset, "duration", time.Since(start))
124+
slog.Info("File processing complete", "cluster", p.clusterLabel, "new_offset", p.offset, "duration", time.Since(start))
125125
}()
126126

127127
reader := bufio.NewReaderSize(file, 1<<24) // 16MB buffer

0 commit comments

Comments
 (0)