Skip to content

Commit cb169cd

Browse files
committed
Align multiple audit logs
1 parent 05d85c4 commit cb169cd

File tree

3 files changed

+68
-32
lines changed

3 files changed

+68
-32
lines changed

cmd/kube-apiserver-audit-exporter/main.go

+54-20
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,64 @@ func init() {
2727
pflag.Parse()
2828
}
2929

30-
func main() {
31-
go func() {
32-
if delay > 0 {
33-
time.Sleep(delay)
30+
func monitorAndStartExporters() {
31+
paths := make([]string, 0, len(auditLogPath))
32+
labels := make([]string, 0, len(auditLogPath))
33+
34+
for _, p := range auditLogPath {
35+
path, label := getPathAndLabel(p)
36+
paths = append(paths, path)
37+
labels = append(labels, label)
38+
}
39+
40+
for !validAuditLogs(paths) {
41+
time.Sleep(time.Second)
42+
}
43+
44+
if delay > 0 {
45+
time.Sleep(delay)
46+
}
47+
48+
for i, path := range paths {
49+
e := exporter.NewExporter(
50+
exporter.WithReplay(replay),
51+
exporter.WithFile(path),
52+
exporter.WithClusterLabel(labels[i]),
53+
)
54+
go e.Run()
55+
}
56+
}
57+
58+
func validAuditLogs(paths []string) bool {
59+
for _, p := range paths {
60+
info, err := os.Stat(p)
61+
if err != nil {
62+
slog.Warn("Failed to stat audit log", "path", p, "err", err)
63+
return false
3464
}
35-
for _, p := range auditLogPath {
36-
ns := strings.SplitN(p, ":", 2)
37-
path := ns[0]
38-
clusterLabel := cluster
39-
if len(ns) > 1 {
40-
clusterLabel = ns[1]
41-
}
42-
43-
e := exporter.NewExporter(
44-
exporter.WithReplay(replay),
45-
exporter.WithFile(path),
46-
exporter.WithClusterLabel(clusterLabel),
47-
)
48-
e.Start()
65+
if info.Size() == 0 {
66+
slog.Info("Audit log is empty, waiting for content", "path", p)
67+
return false
4968
}
50-
}()
69+
}
70+
return true
71+
}
72+
73+
func getPathAndLabel(s string) (string, string) {
74+
parts := strings.SplitN(s, ":", 2)
75+
path := parts[0]
76+
clusterLabel := cluster
77+
if len(parts) > 1 {
78+
clusterLabel = parts[1]
79+
}
80+
return path, clusterLabel
81+
}
82+
83+
func main() {
84+
go monitorAndStartExporters()
5185

5286
if err := exporter.ListenAndServe(address); err != nil {
53-
slog.Error("Failed", "err", err)
87+
slog.Error("Failed to start metrics server", "err", err)
5488
os.Exit(1)
5589
}
5690
}

exporter/exporter.go

+6-11
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,6 @@ type Exporter struct {
6060
batchJobCreationTimes map[target]*time.Time
6161
}
6262

63-
func (p *Exporter) Start() {
64-
// Process audit events
65-
go p.processAuditEvents()
66-
}
67-
6863
func ListenAndServe(addr string) error {
6964
mux := http.NewServeMux()
7065
handler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{
@@ -76,8 +71,8 @@ func ListenAndServe(addr string) error {
7671
return http.ListenAndServe(addr, mux)
7772
}
7873

79-
// processAuditEvents handles audit log file changes
80-
func (p *Exporter) processAuditEvents() {
74+
// Run handles audit log file changes
75+
func (p *Exporter) Run() {
8176
ticker := time.NewTicker(time.Second)
8277
defer ticker.Stop()
8378

@@ -90,7 +85,7 @@ func (p *Exporter) processAuditEvents() {
9085
// handleFileEvent processes filesystem events
9186
func (p *Exporter) handleFileEvent(path string) {
9287
if err := p.processFileUpdate(path); err != nil {
93-
slog.Error("Error processing file", "error", err)
88+
slog.Error("Error processing file", "cluster", p.clusterLabel, "error", err)
9489
}
9590
}
9691

@@ -102,10 +97,10 @@ func (p *Exporter) processFileUpdate(path string) error {
10297
}
10398

10499
if size := fileInfo.Size(); size < p.offset {
105-
slog.Info("Log file truncated, resetting offset")
100+
slog.Info("Log file truncated, resetting offset", "cluster", p.clusterLabel)
106101
p.offset = 0
107102
} else if size == p.offset {
108-
slog.Info("No new updates in log file", "offset", p.offset)
103+
slog.Info("No new updates in log file", "cluster", p.clusterLabel, "offset", p.offset)
109104
return nil
110105
}
111106

@@ -121,7 +116,7 @@ func (p *Exporter) processFileUpdate(path string) error {
121116

122117
start := time.Now()
123118
defer func() {
124-
slog.Info("File processing complete", "new_offset", p.offset, "duration", time.Since(start))
119+
slog.Info("File processing complete", "cluster", p.clusterLabel, "new_offset", p.offset, "duration", time.Since(start))
125120
}()
126121

127122
reader := bufio.NewReaderSize(file, 1<<24) // 16MB buffer

exporter/metrics.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,14 @@ func (p *Exporter) updateMetrics(clusterLabel string, event auditv1.Event) {
7676
target := buildTarget(event.ObjectRef)
7777
createTime, exists := p.podCreationTimes[target]
7878
if !exists {
79-
slog.Warn("Pod not found", "target", target)
79+
// Kueue's audit events may create pod/binding events before pod creation events
80+
user := extractUserAgent(event.UserAgent)
81+
podSchedulingLatency.WithLabelValues(
82+
clusterLabel,
83+
ns,
84+
user,
85+
).Observe(0)
86+
p.podCreationTimes[target] = nil
8087
return
8188
}
8289

0 commit comments

Comments
 (0)