diff --git a/internal/kernelparams/kernelparams.go b/internal/kernelparams/kernelparams.go index f92aa3212e..79ead41e5a 100644 --- a/internal/kernelparams/kernelparams.go +++ b/internal/kernelparams/kernelparams.go @@ -94,8 +94,8 @@ func atomicFileWrite(kernelParamsFile string, data []byte) error { return os.Rename(tempFile.Name(), kernelParamsFile) } -// pathForParam returns the sysfs path for a given parameter. -func pathForParam(name ParamName, major, minor uint32) (string, error) { +// PathForParam returns the sysfs path for a given parameter. +func PathForParam(name ParamName, major, minor uint32) (string, error) { switch name { case MaxReadAheadKb: return fmt.Sprintf("/sys/class/bdi/%d:%d/read_ahead_kb", major, minor), nil @@ -159,7 +159,7 @@ func (c *KernelParamsConfig) applyDirectly(mountPoint string) { return } for _, p := range c.Parameters { - path, err := pathForParam(p.Name, major, minor) + path, err := PathForParam(p.Name, major, minor) if err != nil { logger.Warnf("Unable to update setting %q to value %q for the mount point %q due to err: %v", p.Name, p.Value, mountPoint, err) continue diff --git a/internal/kernelparams/kernelparams_test.go b/internal/kernelparams/kernelparams_test.go index c0a10ad52c..a19bb33a62 100644 --- a/internal/kernelparams/kernelparams_test.go +++ b/internal/kernelparams/kernelparams_test.go @@ -79,7 +79,7 @@ func TestPathForParam(t *testing.T) { for _, tt := range tests { t.Run(string(tt.name), func(t *testing.T) { - path, err := pathForParam(tt.name, tt.major, tt.minor) + path, err := PathForParam(tt.name, tt.major, tt.minor) if tt.expectErr { assert.Error(t, err) diff --git a/tools/integration_tests/flag_optimizations/setup_test.go b/tools/integration_tests/flag_optimizations/setup_test.go index 6db32ef7d6..2207a13440 100644 --- a/tools/integration_tests/flag_optimizations/setup_test.go +++ b/tools/integration_tests/flag_optimizations/setup_test.go @@ -20,7 +20,6 @@ import ( "log" "os" "path" - "strings" "testing" "cloud.google.com/go/storage" @@ -81,15 +80,6 @@ func mustMountGCSFuseAndSetupTestDir(flags []string, ctx context.Context, storag } } -func overrideFilePathsInFlagSet(t *test_suite.TestConfig, GCSFuseTempDirPath string) { - for _, flags := range t.Configs { - for i := range flags.Flags { - // Iterate over the indices of the flags slice - flags.Flags[i] = strings.ReplaceAll(flags.Flags[i], "/gcsfuse-tmp", path.Join(GCSFuseTempDirPath, "gcsfuse-tmp")) - } - } -} - //////////////////////////////////////////////////////////////////////// // TestMain //////////////////////////////////////////////////////////////////////// @@ -106,8 +96,8 @@ func TestMain(m *testing.M) { cfg.FlagOptimizations[0].TestBucket = setup.TestBucket() cfg.FlagOptimizations[0].GKEMountedDirectory = setup.MountedDirectory() cfg.FlagOptimizations[0].LogFile = setup.LogFile() - // Initialize the slice to hold 6 specific test configurations - cfg.FlagOptimizations[0].Configs = make([]test_suite.ConfigItem, 5) + // Initialize the slice to hold 11 specific test configurations + cfg.FlagOptimizations[0].Configs = make([]test_suite.ConfigItem, 12) cfg.FlagOptimizations[0].Configs[0].Run = "TestMountFails" cfg.FlagOptimizations[0].Configs[0].Flags = []string{"--profile=unknown-profile"} cfg.FlagOptimizations[0].Configs[0].Compatible = map[string]bool{"flat": true, "hns": true, "zonal": true} @@ -140,6 +130,46 @@ func TestMain(m *testing.M) { } cfg.FlagOptimizations[0].Configs[4].Compatible = map[string]bool{"flat": true, "hns": false, "zonal": false} cfg.FlagOptimizations[0].Configs[4].RunOnGKE = true + + cfg.FlagOptimizations[0].Configs[5].Run = "TestZonalBucketOptimizations" + cfg.FlagOptimizations[0].Configs[5].Flags = []string{"--log-severity=trace"} + cfg.FlagOptimizations[0].Configs[5].Compatible = map[string]bool{"flat": false, "hns": false, "zonal": true} + cfg.FlagOptimizations[0].Configs[5].RunOnGKE = false + + cfg.FlagOptimizations[0].Configs[6].Run = "TestZonalBucketOptimizations_ExplicitOverrides" + cfg.FlagOptimizations[0].Configs[6].Flags = []string{"--implicit-dirs --max-read-ahead-kb=2048 --max-background=50 --congestion-threshold=30 --log-severity=trace"} + cfg.FlagOptimizations[0].Configs[6].Compatible = map[string]bool{"flat": false, "hns": false, "zonal": true} + cfg.FlagOptimizations[0].Configs[6].RunOnGKE = false + + cfg.FlagOptimizations[0].Configs[7].Run = "TestZonalBucketOptimizations_Dynamic" + cfg.FlagOptimizations[0].Configs[7].Flags = []string{"--log-severity=trace"} + cfg.FlagOptimizations[0].Configs[7].Compatible = map[string]bool{"flat": false, "hns": false, "zonal": true} + cfg.FlagOptimizations[0].Configs[7].RunOnGKE = false + + cfg.FlagOptimizations[0].Configs[8].Run = "TestKernelReader_DefaultAndPrecedence" + cfg.FlagOptimizations[0].Configs[8].Flags = []string{ + "--implicit-dirs --log-severity=trace", + "--implicit-dirs --log-severity=trace --cache-dir=/gcsfuse-tmp/TestKernelReader_DefaultAndPrecedence_FileCache", + "--implicit-dirs --log-severity=trace --enable-buffered-read=true", + "--implicit-dirs --log-severity=trace --enable-buffered-read=true --cache-dir=/gcsfuse-tmp/TestKernelReader_DefaultAndPrecedence_Both", + } + cfg.FlagOptimizations[0].Configs[8].Compatible = map[string]bool{"flat": false, "hns": false, "zonal": true} + cfg.FlagOptimizations[0].Configs[8].RunOnGKE = false + + cfg.FlagOptimizations[0].Configs[9].Run = "TestFileCache_KernelReaderDisabled" + cfg.FlagOptimizations[0].Configs[9].Flags = []string{"--implicit-dirs --log-severity=trace --enable-kernel-reader=false --cache-dir=/gcsfuse-tmp/TestFileCache_KernelReaderDisabled"} + cfg.FlagOptimizations[0].Configs[9].Compatible = map[string]bool{"flat": false, "hns": false, "zonal": true} + cfg.FlagOptimizations[0].Configs[9].RunOnGKE = false + + cfg.FlagOptimizations[0].Configs[10].Run = "TestBufferedReader_KernelReaderDisabled" + cfg.FlagOptimizations[0].Configs[10].Flags = []string{"--implicit-dirs --log-severity=trace --enable-kernel-reader=false --enable-buffered-read"} + cfg.FlagOptimizations[0].Configs[10].Compatible = map[string]bool{"flat": false, "hns": false, "zonal": true} + cfg.FlagOptimizations[0].Configs[10].RunOnGKE = false + + cfg.FlagOptimizations[0].Configs[11].Run = "TestKernelReader_Dynamic" + cfg.FlagOptimizations[0].Configs[11].Flags = []string{"--implicit-dirs --log-severity=trace"} + cfg.FlagOptimizations[0].Configs[11].Compatible = map[string]bool{"flat": false, "hns": false, "zonal": true} + cfg.FlagOptimizations[0].Configs[11].RunOnGKE = false } testEnv.ctx = context.Background() @@ -164,7 +194,7 @@ func TestMain(m *testing.M) { // Set up test directory. setup.SetUpTestDirForTestBucket(&testEnv.cfg) // Override GKE specific paths with GCSFuse paths if running in GCE environment. - overrideFilePathsInFlagSet(&testEnv.cfg, setup.TestDir()) + setup.OverrideFilePathsInFlagSet(&testEnv.cfg, setup.TestDir()) // Save mount and root directory variables. testEnv.mountDir, testEnv.rootDir = setup.MntDir(), setup.MntDir() diff --git a/tools/integration_tests/flag_optimizations/zonal_bucket_optimization_test.go b/tools/integration_tests/flag_optimizations/zonal_bucket_optimization_test.go new file mode 100644 index 0000000000..6c20579013 --- /dev/null +++ b/tools/integration_tests/flag_optimizations/zonal_bucket_optimization_test.go @@ -0,0 +1,307 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flag_optimizations + +import ( + "fmt" + "log" + "os" + "strings" + "testing" + + "github.com/googlecloudplatform/gcsfuse/v3/cfg" + "github.com/googlecloudplatform/gcsfuse/v3/internal/kernelparams" + "github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/operations" + "github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/setup" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "golang.org/x/sys/unix" +) + +const ( + // kernelReaderInitMsg indicates the MRD pool initialization. + // MRD pool is used only by kernel reader. This confirms that the kernel reader is enabled and initializing. + kernelReaderInitMsg = "Initializing MRD Pool with size:" + + // fileCacheMsg indicates that the file cache is being used. + fileCacheMsg = "FileCache(" + + // bufferedReaderSchedMsg indicates the buffered reader is scheduling a block download. + bufferedReaderSchedMsg = "Scheduling block:" + + // readFileStartMsg indicates the start of a ReadFile operation. + readFileStartMsg = "<- ReadFile" + + // readFileEndMsg indicates the completion of a ReadFile operation. + readFileEndMsg = "-> ReadFile" +) + +//////////////////////////////////////////////////////////////////////// +// Helpers +//////////////////////////////////////////////////////////////////////// + +func (s *KernelReaderParamsSuite) verifyKernelParam(path string, expectedVal string, optimizedVal string) { + s.T().Helper() + content, err := os.ReadFile(path) + require.NoError(s.T(), err) + val := strings.TrimSpace(string(content)) + + if expectedVal != "" { + assert.Equal(s.T(), expectedVal, val, "Param %s mismatch", path) + } else if setup.IsDynamicMount(testEnv.mountDir, testEnv.rootDir) { + assert.NotEqual(s.T(), optimizedVal, val, "Param %s should NOT match optimized default for dynamic mount", path) + } else { + assert.Equal(s.T(), optimizedVal, val, "Param %s should NOT match optimized default", path) + } +} + +func (s *ReadStrategySuite) validateParallelReads(logContent string) { + s.T().Helper() + lines := strings.Split(logContent, "\n") + currentParallelism := 0 + maxParallelism := 0 + for _, line := range lines { + if strings.Contains(line, readFileStartMsg) { + currentParallelism++ + } + if strings.Contains(line, readFileEndMsg) { + currentParallelism-- + } + if currentParallelism > maxParallelism { + maxParallelism = currentParallelism + } + if maxParallelism >= 2 { + break + } + } + assert.Greater(s.T(), maxParallelism, 1, "Expected parallel reads (max parallelism > 1)") +} + +func createAndReadFile(t *testing.T, testName string) { + t.Helper() + testName = strings.ReplaceAll(testName, "/", "_") + fileName := testEnv.testDirPath + "/" + testName + "_test_file.txt" + // Use operations.CreateFileOfSize which uses O_DIRECT to avoid polluting page cache during write. + operations.CreateFileOfSize(10*1024*1024, fileName, t) + require.NoError(t, os.Truncate(setup.LogFile(), 0), "Failed to truncate log file") + + // Read the file using os.ReadFile which uses page cache to trigger kernel readahead. + _, err := os.ReadFile(fileName) + + require.NoError(t, err, "Failed to read file") +} + +//////////////////////////////////////////////////////////////////////// +// Tests +//////////////////////////////////////////////////////////////////////// + +// KernelParamsSuite tests the behavior of zonal bucket optimizations, +// specifically verifying kernel parameters. +type KernelReaderParamsSuite struct { + suite.Suite + flags []string + expectedReadAhead string + expectedMaxBackground string + expectedCongestionThreshold string +} + +func (s *KernelReaderParamsSuite) SetupSuite() { + mustMountGCSFuseAndSetupTestDir(s.flags, testEnv.ctx, testEnv.storageClient) +} + +func (s *KernelReaderParamsSuite) TearDownSuite() { + tearDownOptimizationTest(s.T()) +} + +// TestKernelParamVerification verifies the values of max_read_ahead_kb, +// max_background, and congestion_threshold for Zonal Buckets. +// For non dynamic ZB mounts, they should be updated to the optimized values +// (unless explicitly changed via config or CLI). +func (s *KernelReaderParamsSuite) TestKernelParamVerification() { + // Verify kernel parameters in /sys + var stat unix.Stat_t + err := unix.Stat(setup.MntDir(), &stat) + require.NoError(s.T(), err) + devMajor := unix.Major(stat.Dev) + devMinor := unix.Minor(stat.Dev) + readAheadPath, err := kernelparams.PathForParam(kernelparams.MaxReadAheadKb, devMajor, devMinor) + require.NoError(s.T(), err) + maxBackgroundPath, err := kernelparams.PathForParam(kernelparams.MaxBackgroundRequests, devMajor, devMinor) + require.NoError(s.T(), err) + congestionThresholdPath, err := kernelparams.PathForParam(kernelparams.CongestionWindowThreshold, devMajor, devMinor) + require.NoError(s.T(), err) + + optimizedReadAhead := "16384" + optimizedMaxBackground := fmt.Sprintf("%d", cfg.DefaultMaxBackground()) + optimizedCongestion := fmt.Sprintf("%d", cfg.DefaultCongestionThreshold()) + + s.verifyKernelParam(readAheadPath, s.expectedReadAhead, optimizedReadAhead) + s.verifyKernelParam(maxBackgroundPath, s.expectedMaxBackground, optimizedMaxBackground) + s.verifyKernelParam(congestionThresholdPath, s.expectedCongestionThreshold, optimizedCongestion) +} + +func TestZonalBucketOptimizations(t *testing.T) { + if setup.IsDynamicMount(testEnv.mountDir, testEnv.rootDir) { + t.Skip("Skipping test for dynamic mounting") + } + flagsSet := setup.BuildFlagSets(testEnv.cfg, testEnv.bucketType, t.Name()) + for _, flags := range flagsSet { + t.Run("", func(t *testing.T) { + log.Printf("Running tests with flags: %s", flags) + s := &KernelReaderParamsSuite{ + flags: flags, + } + suite.Run(t, s) + }) + } +} + +func TestZonalBucketOptimizations_ExplicitOverrides(t *testing.T) { + if setup.IsDynamicMount(testEnv.mountDir, testEnv.rootDir) { + t.Skip("Skipping test for dynamic mounting") + } + flagsSet := setup.BuildFlagSets(testEnv.cfg, testEnv.bucketType, t.Name()) + for _, flags := range flagsSet { + t.Run("", func(t *testing.T) { + log.Printf("Running tests with flags: %s", flags) + s := &KernelReaderParamsSuite{ + flags: flags, + expectedReadAhead: "2048", + expectedMaxBackground: "50", + expectedCongestionThreshold: "30", + } + suite.Run(t, s) + }) + } +} + +func TestZonalBucketOptimizations_Dynamic(t *testing.T) { + if !setup.IsDynamicMount(testEnv.mountDir, testEnv.rootDir) { + t.Skip("Skipping test for non dynamic mounting") + } + flags := []string{"--log-severity=trace"} + log.Printf("Running tests with flags: %s", flags) + s := &KernelReaderParamsSuite{ + flags: flags, + } + suite.Run(t, s) +} + +// ReadStrategySuite tests the behavior of the kernel reader under different configurations, +// verifying log output and read parallelism. +type ReadStrategySuite struct { + suite.Suite + flags []string + expectedLog string + unexpectedLog string + validateParallelism bool +} + +func (s *ReadStrategySuite) SetupSuite() { + err := mountGCSFuseAndSetupTestDir(s.flags, testEnv.ctx, testEnv.storageClient) + require.NoError(s.T(), err) +} + +func (s *ReadStrategySuite) TearDownSuite() { + setup.SaveGCSFuseLogFileInCaseOfFailure(s.T()) + setup.UnmountGCSFuseAndDeleteLogFile(testEnv.rootDir) +} + +// TestKernelReaderBehavior verifies the read strategy behavior based on flags. +// Specifically for Zonal Buckets, it checks that if not explicitly disabled, +// Kernel Reader is used (taking precedence) even if Buffered Read or File Cache +// are enabled. +func (s *ReadStrategySuite) TestKernelReaderBehavior() { + createAndReadFile(s.T(), s.T().Name()) + + logContent, err := os.ReadFile(setup.LogFile()) + + require.NoError(s.T(), err, "Failed to read log file") + if s.expectedLog != "" { + assert.Contains(s.T(), string(logContent), s.expectedLog, "Expected log '%s' not found in logs", s.expectedLog) + } + if s.unexpectedLog != "" { + assert.NotContains(s.T(), string(logContent), s.unexpectedLog, "Unexpected log '%s' found in logs", s.unexpectedLog) + } + if s.validateParallelism { + s.validateParallelReads(string(logContent)) + } +} + +func TestKernelReader(t *testing.T) { + if setup.IsDynamicMount(testEnv.mountDir, testEnv.rootDir) { + t.Skip("Skipping test for dynamic mounting") + } + testCases := []struct { + configName string + expectedLog string + unexpectedLog string + validateParallelism bool + }{ + { + configName: "TestKernelReader_DefaultAndPrecedence", + expectedLog: kernelReaderInitMsg, + validateParallelism: true, + }, + { + configName: "TestFileCache_KernelReaderDisabled", + expectedLog: fileCacheMsg, + unexpectedLog: kernelReaderInitMsg, + validateParallelism: false, + }, + { + configName: "TestBufferedReader_KernelReaderDisabled", + expectedLog: bufferedReaderSchedMsg, + unexpectedLog: kernelReaderInitMsg, + validateParallelism: false, + }, + } + + for _, tc := range testCases { + flagsSet := setup.BuildFlagSets(testEnv.cfg, testEnv.bucketType, tc.configName) + for _, flags := range flagsSet { + t.Run(tc.configName, func(t *testing.T) { + log.Printf("Running tests with flags: %s", flags) + s := &ReadStrategySuite{ + flags: flags, + expectedLog: tc.expectedLog, + unexpectedLog: tc.unexpectedLog, + validateParallelism: tc.validateParallelism, + } + suite.Run(t, s) + }) + } + } +} + +func TestKernelReader_Dynamic(t *testing.T) { + if !setup.IsDynamicMount(testEnv.mountDir, testEnv.rootDir) { + t.Skip("Skipping test for non dynamic mounting") + } + configName := "TestKernelReader_Dynamic" + flagsSet := setup.BuildFlagSets(testEnv.cfg, testEnv.bucketType, configName) + for _, flags := range flagsSet { + t.Run(configName, func(t *testing.T) { + log.Printf("Running tests with flags: %s", flags) + s := &ReadStrategySuite{ + flags: flags, + unexpectedLog: kernelReaderInitMsg, + validateParallelism: false, + } + suite.Run(t, s) + }) + } +} diff --git a/tools/integration_tests/test_config.yaml b/tools/integration_tests/test_config.yaml index 1e938e1e7c..4a68005d8a 100644 --- a/tools/integration_tests/test_config.yaml +++ b/tools/integration_tests/test_config.yaml @@ -698,6 +698,57 @@ flag_optimizations: hns: false zonal: false run_on_gke: true + - run: TestZonalBucketOptimizations + flags: + - "--log-severity=trace" + compatible: + flat: false + hns: false + zonal: true + run_on_gke: false + - run: TestZonalBucketOptimizations_ExplicitOverrides + flags: + - "--implicit-dirs,--max-read-ahead-kb=2048,--max-background=50,--congestion-threshold=30,--log-severity=trace" + compatible: + flat: false + hns: false + zonal: true + run_on_gke: false + - run: TestKernelReader_DefaultAndPrecedence + flags: + - "--implicit-dirs,--log-severity=trace" + - "--implicit-dirs,--log-severity=trace,--cache-dir=/gcsfuse-tmp/TestKernelReader_DefaultAndPrecedence_FileCache" + - "--implicit-dirs,--log-severity=trace,--enable-buffered-read=true" + - "--implicit-dirs,--log-severity=trace,--enable-buffered-read=true,--cache-dir=/gcsfuse-tmp/TestKernelReader_DefaultAndPrecedence_Both" + compatible: + flat: false + hns: false + zonal: true + run_on_gke: false + - run: TestFileCache_KernelReaderDisabled + flags: + - "--implicit-dirs,--log-severity=trace,--enable-kernel-reader=false,--cache-dir=/gcsfuse-tmp/TestFileCache_KernelReaderDisabled" + compatible: + flat: false + hns: false + zonal: true + run_on_gke: false + - run: TestBufferedReader_KernelReaderDisabled + flags: + - "--implicit-dirs,--log-severity=trace,--enable-kernel-reader=false,--enable-buffered-read" + compatible: + flat: false + hns: false + zonal: true + run_on_gke: false + - run: TestKernelReader_Dynamic + flags: + - "--implicit-dirs,--log-severity=trace" + compatible: + flat: false + hns: false + zonal: true + run_on_gke: false unsupported_path: - mounted_directory: "${MOUNTED_DIR}" @@ -882,3 +933,43 @@ monitoring: zonal: true run: "TestPromGrpcMetricsSuite" run_on_gke: true + +kernel_reader: + - mounted_directory: "${MOUNTED_DIR}" + test_bucket: "${BUCKET_NAME}" + configs: + - run: TestFileCache_KernelReaderDisabled + flags: + - "--implicit-dirs,--log-severity=trace,--enable-kernel-reader=false,--cache-dir=/gcsfuse-tmp/TestFileCache_KernelReaderDisabled" + compatible: + flat: false + hns: false + zonal: true + run_on_gke: false + - run: TestZonalBucketOptimizations_Dynamic + flags: + - "--log-severity=trace" + compatible: + flat: false + hns: false + zonal: true + run_on_gke: false + - run: TestKernelReader_DefaultAndPrecedence + flags: + - "--implicit-dirs,--log-severity=trace" + - "--implicit-dirs,--log-severity=trace,--cache-dir=/gcsfuse-tmp/TestKernelReader_DefaultAndPrecedence_FileCache" + - "--implicit-dirs,--log-severity=trace,--enable-buffered-read=true" + - "--implicit-dirs,--log-severity=trace,--enable-buffered-read=true,--cache-dir=/gcsfuse-tmp/TestKernelReader_DefaultAndPrecedence_Both" + compatible: + flat: false + hns: false + zonal: true + run_on_gke: false + - run: TestBufferedReader_KernelReaderDisabled + flags: + - "--implicit-dirs,--log-severity=trace,--enable-kernel-reader=false,--enable-buffered-read" + compatible: + flat: false + hns: false + zonal: true + run_on_gke: false diff --git a/tools/integration_tests/util/setup/setup.go b/tools/integration_tests/util/setup/setup.go index 6e08cc3e56..ae5fb92c5a 100644 --- a/tools/integration_tests/util/setup/setup.go +++ b/tools/integration_tests/util/setup/setup.go @@ -827,6 +827,12 @@ func GetGCERegion(gceZone string) (string, error) { return region, nil } +// IsDynamicMount returns true if the mount is dynamic. +// In dynamic mounts, rootDir contains all buckets, and mountDir is the specific bucket directory. +func IsDynamicMount(mountDir, rootDir string) bool { + return mountDir != rootDir +} + // ExtractServiceVersionFromFlags parses the cloud-profiler-label from a slice of flag strings. func ExtractServiceVersionFromFlags(flags []string) string { // Regex to find --cloud-profiler-label=some_value or --cloud-profiler-label some_value