From de0ff675f0739f7b1c41053a3d3e5fdb43ca1d84 Mon Sep 17 00:00:00 2001 From: "justin.cheng" Date: Mon, 22 Sep 2025 17:45:23 +0800 Subject: [PATCH 1/7] feat: implement async logger feat: implement async logger feat: implement async logger --- cmd/base/options/generic.go | 2 +- cmd/base/options/log.go | 20 ++++++-- cmd/katalyst-agent/app/agent.go | 15 +++++- go.mod | 5 +- go.sum | 14 +++++- pkg/config/generic/generic_base.go | 2 + pkg/config/generic/log.go | 28 +++++++++++ pkg/util/logging/logger.go | 75 ++++++++++++++++++++++++++++++ 8 files changed, 152 insertions(+), 9 deletions(-) create mode 100644 pkg/config/generic/log.go create mode 100644 pkg/util/logging/logger.go diff --git a/cmd/base/options/generic.go b/cmd/base/options/generic.go index ed9b9f6351..01689fc1be 100644 --- a/cmd/base/options/generic.go +++ b/cmd/base/options/generic.go @@ -115,7 +115,7 @@ func (o *GenericOptions) ApplyTo(c *generic.GenericConfiguration) error { errList := make([]error, 0, 1) errList = append(errList, o.qosOptions.ApplyTo(c.QoSConfiguration)) errList = append(errList, o.metricsOptions.ApplyTo(c.MetricsConfiguration)) - errList = append(errList, o.logsOptions.ApplyTo()) + errList = append(errList, o.logsOptions.ApplyTo(c.LogConfiguration)) errList = append(errList, o.authOptions.ApplyTo(c.AuthConfiguration)) c.ClientConnection.QPS = o.QPS diff --git a/cmd/base/options/log.go b/cmd/base/options/log.go index 356cbed78e..857be70f73 100644 --- a/cmd/base/options/log.go +++ b/cmd/base/options/log.go @@ -19,18 +19,25 @@ package options import ( "github.com/spf13/pflag" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/util/general" ) type LogsOptions struct { - LogPackageLevel general.LoggingPKG - LogFileMaxSizeInMB uint64 + LogPackageLevel general.LoggingPKG + LogFileMaxSizeInMB uint64 + SupportAsyncLogging bool + LogDir string + LogBufferSizeMB int } func NewLogsOptions() *LogsOptions { return &LogsOptions{ LogPackageLevel: general.LoggingPKGFull, LogFileMaxSizeInMB: 1800, + LogDir: "/opt/tiger/toutiao/log/app", + LogBufferSizeMB: 10000, } } @@ -38,10 +45,17 @@ func NewLogsOptions() *LogsOptions { func (o *LogsOptions) AddFlags(fs *pflag.FlagSet) { fs.Var(&o.LogPackageLevel, "logs-package-level", "the default package level for logging") fs.Uint64Var(&o.LogFileMaxSizeInMB, "log-file-max-size", o.LogFileMaxSizeInMB, "Max size of klog file in MB.") + fs.BoolVar(&o.SupportAsyncLogging, "support-async-logging", o.SupportAsyncLogging, "whether to support async logging") + fs.StringVar(&o.LogDir, "log-dir", o.LogDir, "directory of log file") + fs.IntVar(&o.LogBufferSizeMB, "log-buffer-size", o.LogBufferSizeMB, "size of the ring buffer to store async logs") } -func (o *LogsOptions) ApplyTo() error { +func (o *LogsOptions) ApplyTo(c *generic.LogConfiguration) error { general.SetDefaultLoggingPackage(o.LogPackageLevel) general.SetLogFileMaxSize(o.LogFileMaxSizeInMB) + c.SupportAsyncLogging = o.SupportAsyncLogging + c.LogDir = o.LogDir + c.LogFileMaxSize = int(o.LogFileMaxSizeInMB) + c.LogBufferSizeMB = o.LogBufferSizeMB return nil } diff --git a/cmd/katalyst-agent/app/agent.go b/cmd/katalyst-agent/app/agent.go index 2267ee0708..16259b36b7 100644 --- a/cmd/katalyst-agent/app/agent.go +++ b/cmd/katalyst-agent/app/agent.go @@ -32,6 +32,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/logging" "github.com/kubewharf/katalyst-core/pkg/util/process" ) @@ -42,10 +43,14 @@ const ( metricsNameAgentStarted = "agent_started" ) +const defaultLogFileName = "/opt/tiger/toutiao/log/app/agent.log" + // Run starts common and uniformed agent components here, and starts other // specific components in other separate repos (with common components as // dependencies) -func Run(conf *config.Configuration, clientSet *client.GenericClientSet, genericOptions ...katalystbase.GenericOptions) error { +func Run( + conf *config.Configuration, clientSet *client.GenericClientSet, genericOptions ...katalystbase.GenericOptions, +) error { // Set up signals so that we handle the first shutdown signal gracefully. ctx := process.SetupSignalHandler() @@ -60,6 +65,11 @@ func Run(conf *config.Configuration, clientSet *client.GenericClientSet, generic return err } + if conf.SupportAsyncLogging { + asyncLogger := logging.NewAsyncLogger(genericCtx, defaultLogFileName, conf.LogFileMaxSize, conf.LogBufferSizeMB) + defer asyncLogger.Shutdown() + } + for _, genericOption := range genericOptions { genericOption(genericCtx) } @@ -78,7 +88,8 @@ func Run(conf *config.Configuration, clientSet *client.GenericClientSet, generic } // startAgent is used to initialize and start each component in katalyst-agent -func startAgent(ctx context.Context, genericCtx *agent.GenericContext, +func startAgent( + ctx context.Context, genericCtx *agent.GenericContext, conf *config.Configuration, agents map[string]AgentStarter, ) error { componentMap := make(map[string]agent.Component) diff --git a/go.mod b/go.mod index ad98477791..9b520fddaa 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/prometheus/common v0.37.0 github.com/prometheus/procfs v0.10.0 github.com/robfig/cron/v3 v3.0.1 + github.com/rs/zerolog v1.34.0 github.com/safchain/ethtool v0.5.10 github.com/samber/lo v1.39.0 github.com/slok/kubewebhook v0.11.0 @@ -49,6 +50,7 @@ require ( golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 gonum.org/v1/gonum v0.8.2 google.golang.org/grpc v1.57.1 + gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v3 v3.0.1 gotest.tools/v3 v3.0.3 k8s.io/api v0.26.1 @@ -83,7 +85,7 @@ require ( github.com/containerd/console v1.0.3 // indirect github.com/containerd/ttrpc v1.2.3-0.20231030150553-baadfd8e7956 // indirect github.com/coreos/go-semver v0.3.0 // indirect - github.com/coreos/go-systemd/v22 v22.3.2 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cyphar/filepath-securejoin v0.2.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.8.1+incompatible // indirect @@ -159,7 +161,6 @@ require ( google.golang.org/protobuf v1.31.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/apiextensions-apiserver v0.24.2 // indirect k8s.io/cloud-provider v0.24.16 // indirect diff --git a/go.sum b/go.sum index ef329d2821..d5b0b08f44 100644 --- a/go.sum +++ b/go.sum @@ -196,8 +196,9 @@ github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= -github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -601,11 +602,16 @@ github.com/matoous/godox v0.0.0-20190911065817-5d6d842e92eb/go.mod h1:1BELzlh859 github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= @@ -797,6 +803,9 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.0/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= +github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= github.com/rubiojr/go-vhd v0.0.0-20200706105327-02e210299021/go.mod h1:DM5xW0nvfNNm2uytzsvhI3OnX8uzaRAg8UX/CnDqbto= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -1266,8 +1275,11 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/pkg/config/generic/generic_base.go b/pkg/config/generic/generic_base.go index e301687ac8..a7234458af 100644 --- a/pkg/config/generic/generic_base.go +++ b/pkg/config/generic/generic_base.go @@ -36,6 +36,7 @@ type GenericConfiguration struct { *QoSConfiguration *MetricsConfiguration *AuthConfiguration + *LogConfiguration // ClientConnection specifies the kubeconfig file and client connection // settings for the proxy server to use when communicating with the apiserver. @@ -49,5 +50,6 @@ func NewGenericConfiguration() *GenericConfiguration { QoSConfiguration: NewQoSConfiguration(), MetricsConfiguration: NewMetricsConfiguration(), AuthConfiguration: NewAuthConfiguration(), + LogConfiguration: NewLogConfiguration(), } } diff --git a/pkg/config/generic/log.go b/pkg/config/generic/log.go new file mode 100644 index 0000000000..9ab1ea78d3 --- /dev/null +++ b/pkg/config/generic/log.go @@ -0,0 +1,28 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +type LogConfiguration struct { + SupportAsyncLogging bool + LogDir string + LogFileMaxSize int + LogBufferSizeMB int +} + +func NewLogConfiguration() *LogConfiguration { + return &LogConfiguration{} +} diff --git a/pkg/util/logging/logger.go b/pkg/util/logging/logger.go new file mode 100644 index 0000000000..05ed5ec64c --- /dev/null +++ b/pkg/util/logging/logger.go @@ -0,0 +1,75 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logging + +import ( + "time" + + "github.com/rs/zerolog/diode" + "gopkg.in/natefinch/lumberjack.v2" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +const ( + asyncLogger = "async_logger" + logger = "log" + async = "async" + metricsNameNumDroppedLogs = "number_of_dropped_logs" +) + +type AsyncLogger struct { + diodeWriter diode.Writer + logFile string +} + +func NewAsyncLogger(agentCtx *agent.GenericContext, logFile string, maxSizeMB int, bufferSizeMB int) *AsyncLogger { + wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(asyncLogger, metrics.MetricTag{ + Key: logger, + Val: async, + }) + // lumberjackLogger is a logger that rotates log files + lumberjackLogger := &lumberjack.Logger{ + Filename: logFile, + MaxSize: maxSizeMB, + } + asyncWriter := &AsyncLogger{ + logFile: logFile, + } + + // diodeWriter is a writer that stores logs in a ring buffer and asynchronously flushes them + diodeWriter := diode.NewWriter(lumberjackLogger, bufferSizeMB, 10*time.Millisecond, func(missed int) { + _ = wrappedEmitter.StoreInt64(metricsNameNumDroppedLogs, int64(missed), metrics.MetricTypeNameRaw) + }) + + asyncWriter.diodeWriter = diodeWriter + // Overrides the default synchronous writer with the diode writer + klog.SetOutput(diodeWriter) + return asyncWriter +} + +func (a *AsyncLogger) Write(p []byte) (n int, err error) { + return a.diodeWriter.Write(p) +} + +func (a *AsyncLogger) Shutdown() { + klog.Info("[Shutdown] async writer is shutting down...") + klog.Flush() + a.diodeWriter.Close() +} From 5fe074ecf2ab70fd3e24e482e8cfb5b178c44c64 Mon Sep 17 00:00:00 2001 From: "justin.cheng" Date: Thu, 25 Sep 2025 14:56:48 +0800 Subject: [PATCH 2/7] chore: unit tests chore: unit tests chore: unit tests --- pkg/util/logging/logger.go | 8 +---- pkg/util/logging/logger_test.go | 63 +++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 7 deletions(-) create mode 100644 pkg/util/logging/logger_test.go diff --git a/pkg/util/logging/logger.go b/pkg/util/logging/logger.go index 05ed5ec64c..c9caa0ba3a 100644 --- a/pkg/util/logging/logger.go +++ b/pkg/util/logging/logger.go @@ -28,9 +28,6 @@ import ( ) const ( - asyncLogger = "async_logger" - logger = "log" - async = "async" metricsNameNumDroppedLogs = "number_of_dropped_logs" ) @@ -40,10 +37,7 @@ type AsyncLogger struct { } func NewAsyncLogger(agentCtx *agent.GenericContext, logFile string, maxSizeMB int, bufferSizeMB int) *AsyncLogger { - wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(asyncLogger, metrics.MetricTag{ - Key: logger, - Val: async, - }) + wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter() // lumberjackLogger is a logger that rotates log files lumberjackLogger := &lumberjack.Logger{ Filename: logFile, diff --git a/pkg/util/logging/logger_test.go b/pkg/util/logging/logger_test.go new file mode 100644 index 0000000000..025582e233 --- /dev/null +++ b/pkg/util/logging/logger_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logging + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent" + metrics_pool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool" +) + +func TestNewAsyncLogger(t *testing.T) { + t.Parallel() + agentCtx := &agent.GenericContext{ + GenericContext: &katalyst_base.GenericContext{ + EmitterPool: metrics_pool.DummyMetricsEmitterPool{}, + }, + } + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "test.log") + asyncLogger := NewAsyncLogger(agentCtx, logFilePath, 100, 100) + assert.NotNil(t, asyncLogger) +} + +func TestAsyncLogger_WriteAndShutdown(t *testing.T) { + t.Parallel() + agentCtx := &agent.GenericContext{ + GenericContext: &katalyst_base.GenericContext{ + EmitterPool: metrics_pool.DummyMetricsEmitterPool{}, + }, + } + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "test.log") + asyncLogger := NewAsyncLogger(agentCtx, logFilePath, 100, 100) + assert.NotNil(t, asyncLogger) + + _, err := asyncLogger.Write([]byte("test")) + assert.NoError(t, err, "write log should not fail") + + asyncLogger.Shutdown() + + _, err = os.Stat(logFilePath) + assert.NoError(t, err, "log file should be created") +} From 4e894b93985f2dd8f3fcc2fa68b0dad92f7b3fb9 Mon Sep 17 00:00:00 2001 From: "justin.cheng" Date: Thu, 25 Sep 2025 17:03:58 +0800 Subject: [PATCH 3/7] feat: split up logs based on severity feat: split up logs based on severity feat: split up logs based on severity feat: split up logs based on severity feat: split up logs based on severity feat: split up logs based on severity --- cmd/base/options/log.go | 1 - cmd/katalyst-agent/app/agent.go | 4 +- pkg/util/logging/logger.go | 80 +++++++++++++++++++++++---------- pkg/util/logging/logger_test.go | 27 +---------- 4 files changed, 59 insertions(+), 53 deletions(-) diff --git a/cmd/base/options/log.go b/cmd/base/options/log.go index 857be70f73..1380dca333 100644 --- a/cmd/base/options/log.go +++ b/cmd/base/options/log.go @@ -20,7 +20,6 @@ import ( "github.com/spf13/pflag" "github.com/kubewharf/katalyst-core/pkg/config/generic" - "github.com/kubewharf/katalyst-core/pkg/util/general" ) diff --git a/cmd/katalyst-agent/app/agent.go b/cmd/katalyst-agent/app/agent.go index 16259b36b7..6b5a10bc16 100644 --- a/cmd/katalyst-agent/app/agent.go +++ b/cmd/katalyst-agent/app/agent.go @@ -43,8 +43,6 @@ const ( metricsNameAgentStarted = "agent_started" ) -const defaultLogFileName = "/opt/tiger/toutiao/log/app/agent.log" - // Run starts common and uniformed agent components here, and starts other // specific components in other separate repos (with common components as // dependencies) @@ -66,7 +64,7 @@ func Run( } if conf.SupportAsyncLogging { - asyncLogger := logging.NewAsyncLogger(genericCtx, defaultLogFileName, conf.LogFileMaxSize, conf.LogBufferSizeMB) + asyncLogger := logging.NewAsyncLogger(genericCtx, conf.LogFileMaxSize, conf.LogBufferSizeMB) defer asyncLogger.Shutdown() } diff --git a/pkg/util/logging/logger.go b/pkg/util/logging/logger.go index c9caa0ba3a..2087a6f7be 100644 --- a/pkg/util/logging/logger.go +++ b/pkg/util/logging/logger.go @@ -27,43 +27,75 @@ import ( "github.com/kubewharf/katalyst-core/pkg/metrics" ) +type SeverityName string + +const ( + InfoSeverity SeverityName = "INFO" + WarningSeverity SeverityName = "WARNING" + ErrorSeverity SeverityName = "ERROR" + FatalSeverity SeverityName = "FATAL" +) + const ( - metricsNameNumDroppedLogs = "number_of_dropped_logs" + metricsNameNumDroppedInfoLogs = "number_of_dropped_info_logs" + metricsNameNumDroppedWarningLogs = "number_of_dropped_warning_logs" + metricsNameNumDroppedErrorLogs = "number_of_dropped_error_logs" + metricsNameNumDroppedFatalLogs = "number_of_dropped_fatal_logs" ) +const ( + defaultInfoLogFileName = "/opt/tiger/toutiao/log/app/agent.info.log" + defaultWarningLogFileName = "/opt/tiger/toutiao/log/app/agent.warning.log" + defaultErrorLogFileName = "/opt/tiger/toutiao/log/app/agent.error.log" + defaultFatalLogFileName = "/opt/tiger/toutiao/log/app/agent.fatal.log" +) + +type logInfo struct { + fileName string + metricsName string +} + +var logInfoMap = map[SeverityName]*logInfo{ + InfoSeverity: {fileName: defaultInfoLogFileName, metricsName: metricsNameNumDroppedInfoLogs}, + WarningSeverity: {fileName: defaultWarningLogFileName, metricsName: metricsNameNumDroppedWarningLogs}, + ErrorSeverity: {fileName: defaultErrorLogFileName, metricsName: metricsNameNumDroppedErrorLogs}, + FatalSeverity: {fileName: defaultFatalLogFileName, metricsName: metricsNameNumDroppedFatalLogs}, +} + type AsyncLogger struct { - diodeWriter diode.Writer - logFile string + diodeWriters []diode.Writer + logFile string } -func NewAsyncLogger(agentCtx *agent.GenericContext, logFile string, maxSizeMB int, bufferSizeMB int) *AsyncLogger { +// NewAsyncLogger creates an async logger that produces an async writer for each of the severity levels. +// The async writer spins up a goroutine that periodically flushes the buffered logs to disk. +func NewAsyncLogger(agentCtx *agent.GenericContext, maxSizeMB int, bufferSizeMB int) *AsyncLogger { wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter() - // lumberjackLogger is a logger that rotates log files - lumberjackLogger := &lumberjack.Logger{ - Filename: logFile, - MaxSize: maxSizeMB, - } - asyncWriter := &AsyncLogger{ - logFile: logFile, - } - // diodeWriter is a writer that stores logs in a ring buffer and asynchronously flushes them - diodeWriter := diode.NewWriter(lumberjackLogger, bufferSizeMB, 10*time.Millisecond, func(missed int) { - _ = wrappedEmitter.StoreInt64(metricsNameNumDroppedLogs, int64(missed), metrics.MetricTypeNameRaw) - }) + asyncLogger := &AsyncLogger{} + for severity, logInfo := range logInfoMap { + // lumberjackLogger is a logger that rotates log files + lumberjackLogger := &lumberjack.Logger{ + Filename: logInfo.fileName, + MaxSize: maxSizeMB, + } - asyncWriter.diodeWriter = diodeWriter - // Overrides the default synchronous writer with the diode writer - klog.SetOutput(diodeWriter) - return asyncWriter -} + // diodeWriter is a writer that stores logs in a ring buffer and asynchronously flushes them + diodeWriter := diode.NewWriter(lumberjackLogger, bufferSizeMB, 10*time.Millisecond, func(missed int) { + _ = wrappedEmitter.StoreInt64(logInfo.metricsName, int64(missed), metrics.MetricTypeNameRaw) + }) + // Overrides the default synchronous writer with the diode writer + klog.SetOutputBySeverity(string(severity), diodeWriter) + asyncLogger.diodeWriters = append(asyncLogger.diodeWriters, diodeWriter) + } -func (a *AsyncLogger) Write(p []byte) (n int, err error) { - return a.diodeWriter.Write(p) + return asyncLogger } func (a *AsyncLogger) Shutdown() { klog.Info("[Shutdown] async writer is shutting down...") klog.Flush() - a.diodeWriter.Close() + for _, writer := range a.diodeWriters { + writer.Close() + } } diff --git a/pkg/util/logging/logger_test.go b/pkg/util/logging/logger_test.go index 025582e233..9802d80e4f 100644 --- a/pkg/util/logging/logger_test.go +++ b/pkg/util/logging/logger_test.go @@ -17,8 +17,6 @@ limitations under the License. package logging import ( - "os" - "path/filepath" "testing" "github.com/stretchr/testify/assert" @@ -35,29 +33,8 @@ func TestNewAsyncLogger(t *testing.T) { EmitterPool: metrics_pool.DummyMetricsEmitterPool{}, }, } - tempDir := t.TempDir() - logFilePath := filepath.Join(tempDir, "test.log") - asyncLogger := NewAsyncLogger(agentCtx, logFilePath, 100, 100) + asyncLogger := NewAsyncLogger(agentCtx, 100, 100) assert.NotNil(t, asyncLogger) -} - -func TestAsyncLogger_WriteAndShutdown(t *testing.T) { - t.Parallel() - agentCtx := &agent.GenericContext{ - GenericContext: &katalyst_base.GenericContext{ - EmitterPool: metrics_pool.DummyMetricsEmitterPool{}, - }, - } - tempDir := t.TempDir() - logFilePath := filepath.Join(tempDir, "test.log") - asyncLogger := NewAsyncLogger(agentCtx, logFilePath, 100, 100) - assert.NotNil(t, asyncLogger) - - _, err := asyncLogger.Write([]byte("test")) - assert.NoError(t, err, "write log should not fail") - - asyncLogger.Shutdown() - _, err = os.Stat(logFilePath) - assert.NoError(t, err, "log file should be created") + assert.Equal(t, len(asyncLogger.diodeWriters), 4) } From b43b2f8cd2bbad68612b735e0c67fee688f2c3be Mon Sep 17 00:00:00 2001 From: "justin.cheng" Date: Fri, 17 Oct 2025 15:48:56 +0800 Subject: [PATCH 4/7] feat: add cleanup parameters in async logging feat: add cleanup parameters in async logging feat: add cleanup parameters in async logging --- cmd/base/options/log.go | 9 ++++++++- cmd/katalyst-agent/app/agent.go | 2 +- pkg/config/generic/log.go | 2 ++ pkg/util/logging/logger.go | 20 ++++++++++++-------- pkg/util/logging/logger_test.go | 2 +- 5 files changed, 24 insertions(+), 11 deletions(-) diff --git a/cmd/base/options/log.go b/cmd/base/options/log.go index 1380dca333..f476056ad7 100644 --- a/cmd/base/options/log.go +++ b/cmd/base/options/log.go @@ -29,14 +29,17 @@ type LogsOptions struct { SupportAsyncLogging bool LogDir string LogBufferSizeMB int + LogFileMaxAge int + LogFileMaxBackups int } func NewLogsOptions() *LogsOptions { return &LogsOptions{ LogPackageLevel: general.LoggingPKGFull, LogFileMaxSizeInMB: 1800, - LogDir: "/opt/tiger/toutiao/log/app", LogBufferSizeMB: 10000, + LogFileMaxAge: 7, + LogFileMaxBackups: 10, } } @@ -47,6 +50,8 @@ func (o *LogsOptions) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&o.SupportAsyncLogging, "support-async-logging", o.SupportAsyncLogging, "whether to support async logging") fs.StringVar(&o.LogDir, "log-dir", o.LogDir, "directory of log file") fs.IntVar(&o.LogBufferSizeMB, "log-buffer-size", o.LogBufferSizeMB, "size of the ring buffer to store async logs") + fs.IntVar(&o.LogFileMaxAge, "log-file-max-age", o.LogFileMaxAge, "max age of klog log file in days") + fs.IntVar(&o.LogFileMaxBackups, "log-file-max-backups", o.LogFileMaxBackups, "max number of klog log file backups") } func (o *LogsOptions) ApplyTo(c *generic.LogConfiguration) error { @@ -56,5 +61,7 @@ func (o *LogsOptions) ApplyTo(c *generic.LogConfiguration) error { c.LogDir = o.LogDir c.LogFileMaxSize = int(o.LogFileMaxSizeInMB) c.LogBufferSizeMB = o.LogBufferSizeMB + c.LogFileMaxAge = o.LogFileMaxAge + c.LogFileMaxBackups = o.LogFileMaxBackups return nil } diff --git a/cmd/katalyst-agent/app/agent.go b/cmd/katalyst-agent/app/agent.go index 6b5a10bc16..df47c76082 100644 --- a/cmd/katalyst-agent/app/agent.go +++ b/cmd/katalyst-agent/app/agent.go @@ -64,7 +64,7 @@ func Run( } if conf.SupportAsyncLogging { - asyncLogger := logging.NewAsyncLogger(genericCtx, conf.LogFileMaxSize, conf.LogBufferSizeMB) + asyncLogger := logging.NewAsyncLogger(genericCtx, conf.LogDir, conf.LogFileMaxSize, conf.LogFileMaxAge, conf.LogFileMaxBackups, conf.LogBufferSizeMB) defer asyncLogger.Shutdown() } diff --git a/pkg/config/generic/log.go b/pkg/config/generic/log.go index 9ab1ea78d3..f30f5baa3a 100644 --- a/pkg/config/generic/log.go +++ b/pkg/config/generic/log.go @@ -21,6 +21,8 @@ type LogConfiguration struct { LogDir string LogFileMaxSize int LogBufferSizeMB int + LogFileMaxAge int + LogFileMaxBackups int } func NewLogConfiguration() *LogConfiguration { diff --git a/pkg/util/logging/logger.go b/pkg/util/logging/logger.go index 2087a6f7be..a2a0a161cd 100644 --- a/pkg/util/logging/logger.go +++ b/pkg/util/logging/logger.go @@ -17,6 +17,7 @@ limitations under the License. package logging import ( + "path" "time" "github.com/rs/zerolog/diode" @@ -44,10 +45,10 @@ const ( ) const ( - defaultInfoLogFileName = "/opt/tiger/toutiao/log/app/agent.info.log" - defaultWarningLogFileName = "/opt/tiger/toutiao/log/app/agent.warning.log" - defaultErrorLogFileName = "/opt/tiger/toutiao/log/app/agent.error.log" - defaultFatalLogFileName = "/opt/tiger/toutiao/log/app/agent.fatal.log" + defaultInfoLogFileName = "agent.info.log" + defaultWarningLogFileName = "agent.warning.log" + defaultErrorLogFileName = "agent.error.log" + defaultFatalLogFileName = "agent.fatal.log" ) type logInfo struct { @@ -64,20 +65,23 @@ var logInfoMap = map[SeverityName]*logInfo{ type AsyncLogger struct { diodeWriters []diode.Writer - logFile string } // NewAsyncLogger creates an async logger that produces an async writer for each of the severity levels. // The async writer spins up a goroutine that periodically flushes the buffered logs to disk. -func NewAsyncLogger(agentCtx *agent.GenericContext, maxSizeMB int, bufferSizeMB int) *AsyncLogger { +func NewAsyncLogger( + agentCtx *agent.GenericContext, logDir string, maxSizeMB, maxAge, maxBackups, bufferSizeMB int, +) *AsyncLogger { wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter() asyncLogger := &AsyncLogger{} for severity, logInfo := range logInfoMap { // lumberjackLogger is a logger that rotates log files lumberjackLogger := &lumberjack.Logger{ - Filename: logInfo.fileName, - MaxSize: maxSizeMB, + Filename: path.Join(logDir, logInfo.fileName), + MaxSize: maxSizeMB, + MaxAge: maxAge, + MaxBackups: maxBackups, } // diodeWriter is a writer that stores logs in a ring buffer and asynchronously flushes them diff --git a/pkg/util/logging/logger_test.go b/pkg/util/logging/logger_test.go index 9802d80e4f..e473758c80 100644 --- a/pkg/util/logging/logger_test.go +++ b/pkg/util/logging/logger_test.go @@ -33,7 +33,7 @@ func TestNewAsyncLogger(t *testing.T) { EmitterPool: metrics_pool.DummyMetricsEmitterPool{}, }, } - asyncLogger := NewAsyncLogger(agentCtx, 100, 100) + asyncLogger := NewAsyncLogger(agentCtx, "testDir", 100, 100, 100, 100) assert.NotNil(t, asyncLogger) assert.Equal(t, len(asyncLogger.diodeWriters), 4) From ff5da7088cce8a6ee452fee28ae6f97c1ad3ce63 Mon Sep 17 00:00:00 2001 From: "justin.cheng" Date: Tue, 21 Oct 2025 11:31:31 +0800 Subject: [PATCH 5/7] fix: PR comments fix: PR comments fix: PR comments fix: PR comments fix: PR comments fix: PR comments fix: PR comments fix: PR comments fix: PR comments fix: PR comments fix: PR comments fix: PR comments --- cmd/base/options/log.go | 12 ++++++------ cmd/katalyst-agent/app/agent.go | 2 +- pkg/config/generic/log.go | 2 +- pkg/util/logging/logger.go | 18 +++++++++++------- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/cmd/base/options/log.go b/cmd/base/options/log.go index f476056ad7..223e294581 100644 --- a/cmd/base/options/log.go +++ b/cmd/base/options/log.go @@ -28,7 +28,7 @@ type LogsOptions struct { LogFileMaxSizeInMB uint64 SupportAsyncLogging bool LogDir string - LogBufferSizeMB int + LogBufferSize int LogFileMaxAge int LogFileMaxBackups int } @@ -37,7 +37,7 @@ func NewLogsOptions() *LogsOptions { return &LogsOptions{ LogPackageLevel: general.LoggingPKGFull, LogFileMaxSizeInMB: 1800, - LogBufferSizeMB: 10000, + LogBufferSize: 10000, LogFileMaxAge: 7, LogFileMaxBackups: 10, } @@ -45,11 +45,11 @@ func NewLogsOptions() *LogsOptions { // AddFlags adds flags to the specified FlagSet. func (o *LogsOptions) AddFlags(fs *pflag.FlagSet) { + fs.BoolVar(&o.SupportAsyncLogging, "support-async-logging", o.SupportAsyncLogging, "whether to support async logging") + fs.StringVar(&o.LogDir, "async_log_dir", o.LogDir, "directory to store logs") fs.Var(&o.LogPackageLevel, "logs-package-level", "the default package level for logging") fs.Uint64Var(&o.LogFileMaxSizeInMB, "log-file-max-size", o.LogFileMaxSizeInMB, "Max size of klog file in MB.") - fs.BoolVar(&o.SupportAsyncLogging, "support-async-logging", o.SupportAsyncLogging, "whether to support async logging") - fs.StringVar(&o.LogDir, "log-dir", o.LogDir, "directory of log file") - fs.IntVar(&o.LogBufferSizeMB, "log-buffer-size", o.LogBufferSizeMB, "size of the ring buffer to store async logs") + fs.IntVar(&o.LogBufferSize, "log-buffer-size", o.LogBufferSize, "size of the ring buffer to store async logs") fs.IntVar(&o.LogFileMaxAge, "log-file-max-age", o.LogFileMaxAge, "max age of klog log file in days") fs.IntVar(&o.LogFileMaxBackups, "log-file-max-backups", o.LogFileMaxBackups, "max number of klog log file backups") } @@ -59,8 +59,8 @@ func (o *LogsOptions) ApplyTo(c *generic.LogConfiguration) error { general.SetLogFileMaxSize(o.LogFileMaxSizeInMB) c.SupportAsyncLogging = o.SupportAsyncLogging c.LogDir = o.LogDir + c.LogBufferSize = o.LogBufferSize c.LogFileMaxSize = int(o.LogFileMaxSizeInMB) - c.LogBufferSizeMB = o.LogBufferSizeMB c.LogFileMaxAge = o.LogFileMaxAge c.LogFileMaxBackups = o.LogFileMaxBackups return nil diff --git a/cmd/katalyst-agent/app/agent.go b/cmd/katalyst-agent/app/agent.go index df47c76082..f7b7e3372c 100644 --- a/cmd/katalyst-agent/app/agent.go +++ b/cmd/katalyst-agent/app/agent.go @@ -64,7 +64,7 @@ func Run( } if conf.SupportAsyncLogging { - asyncLogger := logging.NewAsyncLogger(genericCtx, conf.LogDir, conf.LogFileMaxSize, conf.LogFileMaxAge, conf.LogFileMaxBackups, conf.LogBufferSizeMB) + asyncLogger := logging.NewAsyncLogger(genericCtx, conf.LogDir, conf.LogFileMaxSize, conf.LogFileMaxAge, conf.LogFileMaxBackups, conf.LogBufferSize) defer asyncLogger.Shutdown() } diff --git a/pkg/config/generic/log.go b/pkg/config/generic/log.go index f30f5baa3a..a4810c123a 100644 --- a/pkg/config/generic/log.go +++ b/pkg/config/generic/log.go @@ -20,7 +20,7 @@ type LogConfiguration struct { SupportAsyncLogging bool LogDir string LogFileMaxSize int - LogBufferSizeMB int + LogBufferSize int LogFileMaxAge int LogFileMaxBackups int } diff --git a/pkg/util/logging/logger.go b/pkg/util/logging/logger.go index a2a0a161cd..37f8e76abf 100644 --- a/pkg/util/logging/logger.go +++ b/pkg/util/logging/logger.go @@ -17,7 +17,10 @@ limitations under the License. package logging import ( + "fmt" + "os" "path" + "path/filepath" "time" "github.com/rs/zerolog/diode" @@ -44,11 +47,12 @@ const ( metricsNameNumDroppedFatalLogs = "number_of_dropped_fatal_logs" ) -const ( - defaultInfoLogFileName = "agent.info.log" - defaultWarningLogFileName = "agent.warning.log" - defaultErrorLogFileName = "agent.error.log" - defaultFatalLogFileName = "agent.fatal.log" +var ( + basePath = filepath.Base(os.Args[0]) + defaultInfoLogFileName = fmt.Sprintf("%s.%s.log", basePath, InfoSeverity) + defaultWarningLogFileName = fmt.Sprintf("%s.%s.log", basePath, WarningSeverity) + defaultErrorLogFileName = fmt.Sprintf("%s.%s.log", basePath, ErrorSeverity) + defaultFatalLogFileName = fmt.Sprintf("%s.%s.log", basePath, FatalSeverity) ) type logInfo struct { @@ -70,7 +74,7 @@ type AsyncLogger struct { // NewAsyncLogger creates an async logger that produces an async writer for each of the severity levels. // The async writer spins up a goroutine that periodically flushes the buffered logs to disk. func NewAsyncLogger( - agentCtx *agent.GenericContext, logDir string, maxSizeMB, maxAge, maxBackups, bufferSizeMB int, + agentCtx *agent.GenericContext, logDir string, maxSizeMB, maxAge, maxBackups, bufferSize int, ) *AsyncLogger { wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter() @@ -85,7 +89,7 @@ func NewAsyncLogger( } // diodeWriter is a writer that stores logs in a ring buffer and asynchronously flushes them - diodeWriter := diode.NewWriter(lumberjackLogger, bufferSizeMB, 10*time.Millisecond, func(missed int) { + diodeWriter := diode.NewWriter(lumberjackLogger, bufferSize, 10*time.Millisecond, func(missed int) { _ = wrappedEmitter.StoreInt64(logInfo.metricsName, int64(missed), metrics.MetricTypeNameRaw) }) // Overrides the default synchronous writer with the diode writer From 6e1dffc256f5a67fb04a8c3605951d6f6b9af526 Mon Sep 17 00:00:00 2001 From: "justin.cheng" Date: Wed, 22 Oct 2025 10:32:04 +0800 Subject: [PATCH 6/7] feat: make logger more dynamic by allowing synchronous lumberjack feat: make logger more dynamic by allowing synchronous lumberjack feat: make logger more dynamic by allowing synchronous lumberjack feat: make logger more dynamic by allowing synchronous lumberjack feat: make logger more dynamic by allowing synchronous lumberjack feat: make logger more dynamic by allowing synchronous lumberjack --- cmd/base/options/log.go | 22 +++++------- cmd/katalyst-agent/app/agent.go | 6 ++-- pkg/config/generic/log.go | 11 +++--- pkg/util/logging/logger.go | 45 +++++++++++++++--------- pkg/util/logging/logger_test.go | 62 +++++++++++++++++++++++++++++---- 5 files changed, 99 insertions(+), 47 deletions(-) diff --git a/cmd/base/options/log.go b/cmd/base/options/log.go index 223e294581..7af1101db9 100644 --- a/cmd/base/options/log.go +++ b/cmd/base/options/log.go @@ -24,29 +24,24 @@ import ( ) type LogsOptions struct { - LogPackageLevel general.LoggingPKG - LogFileMaxSizeInMB uint64 - SupportAsyncLogging bool - LogDir string - LogBufferSize int - LogFileMaxAge int - LogFileMaxBackups int + LogPackageLevel general.LoggingPKG + LogFileMaxSizeInMB uint64 + CustomLogDir string + LogBufferSize int + LogFileMaxAge int + LogFileMaxBackups int } func NewLogsOptions() *LogsOptions { return &LogsOptions{ LogPackageLevel: general.LoggingPKGFull, LogFileMaxSizeInMB: 1800, - LogBufferSize: 10000, - LogFileMaxAge: 7, - LogFileMaxBackups: 10, } } // AddFlags adds flags to the specified FlagSet. func (o *LogsOptions) AddFlags(fs *pflag.FlagSet) { - fs.BoolVar(&o.SupportAsyncLogging, "support-async-logging", o.SupportAsyncLogging, "whether to support async logging") - fs.StringVar(&o.LogDir, "async_log_dir", o.LogDir, "directory to store logs") + fs.StringVar(&o.CustomLogDir, "custom_log_dir", o.CustomLogDir, "directory to store logs for custom logger") fs.Var(&o.LogPackageLevel, "logs-package-level", "the default package level for logging") fs.Uint64Var(&o.LogFileMaxSizeInMB, "log-file-max-size", o.LogFileMaxSizeInMB, "Max size of klog file in MB.") fs.IntVar(&o.LogBufferSize, "log-buffer-size", o.LogBufferSize, "size of the ring buffer to store async logs") @@ -57,8 +52,7 @@ func (o *LogsOptions) AddFlags(fs *pflag.FlagSet) { func (o *LogsOptions) ApplyTo(c *generic.LogConfiguration) error { general.SetDefaultLoggingPackage(o.LogPackageLevel) general.SetLogFileMaxSize(o.LogFileMaxSizeInMB) - c.SupportAsyncLogging = o.SupportAsyncLogging - c.LogDir = o.LogDir + c.CustomLogDir = o.CustomLogDir c.LogBufferSize = o.LogBufferSize c.LogFileMaxSize = int(o.LogFileMaxSizeInMB) c.LogFileMaxAge = o.LogFileMaxAge diff --git a/cmd/katalyst-agent/app/agent.go b/cmd/katalyst-agent/app/agent.go index f7b7e3372c..4ea192e29c 100644 --- a/cmd/katalyst-agent/app/agent.go +++ b/cmd/katalyst-agent/app/agent.go @@ -63,10 +63,8 @@ func Run( return err } - if conf.SupportAsyncLogging { - asyncLogger := logging.NewAsyncLogger(genericCtx, conf.LogDir, conf.LogFileMaxSize, conf.LogFileMaxAge, conf.LogFileMaxBackups, conf.LogBufferSize) - defer asyncLogger.Shutdown() - } + asyncLogger := logging.NewCustomLogger(genericCtx, conf.CustomLogDir, conf.LogFileMaxSize, conf.LogFileMaxAge, conf.LogFileMaxBackups, conf.LogBufferSize) + defer asyncLogger.Shutdown() for _, genericOption := range genericOptions { genericOption(genericCtx) diff --git a/pkg/config/generic/log.go b/pkg/config/generic/log.go index a4810c123a..cd168a770a 100644 --- a/pkg/config/generic/log.go +++ b/pkg/config/generic/log.go @@ -17,12 +17,11 @@ limitations under the License. package generic type LogConfiguration struct { - SupportAsyncLogging bool - LogDir string - LogFileMaxSize int - LogBufferSize int - LogFileMaxAge int - LogFileMaxBackups int + CustomLogDir string + LogFileMaxSize int + LogBufferSize int + LogFileMaxAge int + LogFileMaxBackups int } func NewLogConfiguration() *LogConfiguration { diff --git a/pkg/util/logging/logger.go b/pkg/util/logging/logger.go index 37f8e76abf..f35ad4e115 100644 --- a/pkg/util/logging/logger.go +++ b/pkg/util/logging/logger.go @@ -67,40 +67,53 @@ var logInfoMap = map[SeverityName]*logInfo{ FatalSeverity: {fileName: defaultFatalLogFileName, metricsName: metricsNameNumDroppedFatalLogs}, } -type AsyncLogger struct { +type CustomLogger struct { diodeWriters []diode.Writer } -// NewAsyncLogger creates an async logger that produces an async writer for each of the severity levels. -// The async writer spins up a goroutine that periodically flushes the buffered logs to disk. -func NewAsyncLogger( +// NewCustomLogger creates a custom logger that can either be asynchronous or synchronous, depending on configuration. +func NewCustomLogger( agentCtx *agent.GenericContext, logDir string, maxSizeMB, maxAge, maxBackups, bufferSize int, -) *AsyncLogger { +) *CustomLogger { wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter() - asyncLogger := &AsyncLogger{} + // If logDir is not set, we are still using klog's native logger, so we just return an empty logger without calling SetOutput() + if logDir == "" { + return &CustomLogger{} + } + + customLogger := &CustomLogger{} for severity, logInfo := range logInfoMap { + filePath := path.Join(logDir, logInfo.fileName) + // lumberjackLogger is a logger that rotates log files lumberjackLogger := &lumberjack.Logger{ - Filename: path.Join(logDir, logInfo.fileName), + Filename: filePath, MaxSize: maxSizeMB, MaxAge: maxAge, MaxBackups: maxBackups, } - // diodeWriter is a writer that stores logs in a ring buffer and asynchronously flushes them - diodeWriter := diode.NewWriter(lumberjackLogger, bufferSize, 10*time.Millisecond, func(missed int) { - _ = wrappedEmitter.StoreInt64(logInfo.metricsName, int64(missed), metrics.MetricTypeNameRaw) - }) - // Overrides the default synchronous writer with the diode writer - klog.SetOutputBySeverity(string(severity), diodeWriter) - asyncLogger.diodeWriters = append(asyncLogger.diodeWriters, diodeWriter) + // Enable async logger if buffer size is more than 0; otherwise, use synchronous lumberjack logger + if bufferSize > 0 { + // diodeWriter is a writer that stores logs in a ring buffer and asynchronously flushes them to disk + diodeWriter := diode.NewWriter(lumberjackLogger, bufferSize, 10*time.Millisecond, func(missed int) { + _ = wrappedEmitter.StoreInt64(logInfo.metricsName, int64(missed), metrics.MetricTypeNameRaw) + }) + // Overrides the default synchronous writer with the diode writer + klog.SetOutputBySeverity(string(severity), diodeWriter) + customLogger.diodeWriters = append(customLogger.diodeWriters, diodeWriter) + klog.Infof("custom async logger is enabled for the severity %s", severity) + } else { + klog.SetOutputBySeverity(string(severity), lumberjackLogger) + klog.Infof("custom sync logger is enabled for the severity %s", severity) + } } - return asyncLogger + return customLogger } -func (a *AsyncLogger) Shutdown() { +func (a *CustomLogger) Shutdown() { klog.Info("[Shutdown] async writer is shutting down...") klog.Flush() for _, writer := range a.diodeWriters { diff --git a/pkg/util/logging/logger_test.go b/pkg/util/logging/logger_test.go index e473758c80..fd1badf994 100644 --- a/pkg/util/logging/logger_test.go +++ b/pkg/util/logging/logger_test.go @@ -26,15 +26,63 @@ import ( metrics_pool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool" ) -func TestNewAsyncLogger(t *testing.T) { +func TestNewCustomLogger(t *testing.T) { t.Parallel() - agentCtx := &agent.GenericContext{ - GenericContext: &katalyst_base.GenericContext{ - EmitterPool: metrics_pool.DummyMetricsEmitterPool{}, + testCases := []struct { + name string + logDir string + maxSizeMB int + maxAge int + maxBackups int + bufferSize int + expectedDiodeWritersCount int + }{ + { + name: "Disabled logger (logDir is empty), logger is nil", + logDir: "", + maxSizeMB: 10, + maxAge: 0, + maxBackups: 10, + bufferSize: 1024, + expectedDiodeWritersCount: 0, + }, + { + name: "Synchronous logger (bufferSize=0)", + logDir: t.TempDir(), + maxSizeMB: 1, + maxAge: 1, + maxBackups: 1, + bufferSize: 0, + expectedDiodeWritersCount: 0, + }, + { + name: "Asynchronous logger", + logDir: t.TempDir(), + maxSizeMB: 1, + maxAge: 1, + maxBackups: 1, + bufferSize: 1024, + expectedDiodeWritersCount: 4, }, } - asyncLogger := NewAsyncLogger(agentCtx, "testDir", 100, 100, 100, 100) - assert.NotNil(t, asyncLogger) - assert.Equal(t, len(asyncLogger.diodeWriters), 4) + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + agentCtx := &agent.GenericContext{ + GenericContext: &katalyst_base.GenericContext{ + EmitterPool: metrics_pool.DummyMetricsEmitterPool{}, + }, + } + + logger := NewCustomLogger(agentCtx, tc.logDir, tc.maxSizeMB, tc.maxAge, tc.maxBackups, tc.bufferSize) + + assert.NotNil(t, logger) + assert.Len(t, logger.diodeWriters, tc.expectedDiodeWritersCount) + assert.NotPanics(t, func() { + logger.Shutdown() + }) + }) + } } From 533503cb8ac7b5317609fdd8c509aa1f94ea1a4b Mon Sep 17 00:00:00 2001 From: "justin.cheng" Date: Thu, 23 Oct 2025 15:33:28 +0800 Subject: [PATCH 7/7] fix: PR comments --- cmd/katalyst-agent/app/agent.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/katalyst-agent/app/agent.go b/cmd/katalyst-agent/app/agent.go index 4ea192e29c..619f7dbe8a 100644 --- a/cmd/katalyst-agent/app/agent.go +++ b/cmd/katalyst-agent/app/agent.go @@ -63,8 +63,8 @@ func Run( return err } - asyncLogger := logging.NewCustomLogger(genericCtx, conf.CustomLogDir, conf.LogFileMaxSize, conf.LogFileMaxAge, conf.LogFileMaxBackups, conf.LogBufferSize) - defer asyncLogger.Shutdown() + customLogger := logging.NewCustomLogger(genericCtx, conf.CustomLogDir, conf.LogFileMaxSize, conf.LogFileMaxAge, conf.LogFileMaxBackups, conf.LogBufferSize) + defer customLogger.Shutdown() for _, genericOption := range genericOptions { genericOption(genericCtx)