diff --git a/NOTICE.txt b/NOTICE.txt index 31edf7da9e5a..5bb0c9a61276 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -10941,11 +10941,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-libs -Version: v0.26.2 +Version: v0.28.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.26.2/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.28.0/LICENSE: Apache License Version 2.0, January 2004 @@ -30674,13 +30674,13 @@ Contents of probable licence file $GOMODCACHE/go.uber.org/mock@v0.5.0/LICENSE: -------------------------------------------------------------------------------- Dependency : go.uber.org/zap -Version: v1.27.0 +Version: v1.27.1 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/go.uber.org/zap@v1.27.0/LICENSE: +Contents of probable licence file $GOMODCACHE/go.uber.org/zap@v1.27.1/LICENSE: -Copyright (c) 2016-2017 Uber Technologies, Inc. +Copyright (c) 2016-2024 Uber Technologies, Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/changelog/fragments/1765318668-Support-chroot-in-journald.yaml b/changelog/fragments/1765318668-Support-chroot-in-journald.yaml new file mode 100644 index 000000000000..f4676024b231 --- /dev/null +++ b/changelog/fragments/1765318668-Support-chroot-in-journald.yaml @@ -0,0 +1,42 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: | + The Journald input now supports setting a chroot to use when calling + the journalctl binary, thus allowing the journald input to be used + with the wolfi container variant and in environments where the + host's Journald is not compatible with the `journalctl` version + shipped with the container. + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + Add support in the journald inpur for using chroot when calling + `journalctl`. In a container environment this allows to mount the host + file system into the container and use its `journalctl`, which + prevents any sort of incompatibility between the `journalctl` in the + container image and the host Journald. Allows using the journald input with Wolfi based Docker containers. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: filebeat + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/beats/pull/48008 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/beats/issues/47164 diff --git a/docs/reference/filebeat/filebeat-input-journald.md b/docs/reference/filebeat/filebeat-input-journald.md index f0f77abbf2b7..00a2473121bd 100644 --- a/docs/reference/filebeat/filebeat-input-journald.md +++ b/docs/reference/filebeat/filebeat-input-journald.md @@ -15,16 +15,21 @@ The Wolfi-based Docker image does not contain the `journalctl` binary and the `j ::: :::{important} -When using the Journald input from a Docker container, make sure the -`journalctl` binary in the container is compatible with your -Systemd/journal version. To get the version of the `journalctl` binary -in Filebeat's image run the following, adjusting the image name/tag -according to the version that you are running: +When using the Journald input from a Docker container, make sure that +either: + - {applies_to}`stack: ga 9.3.0` [`chroot`](#filebeat-input-journald-chroot), [`journalct_path`](#filebeat-input-journald-journalctl-path) are set, or + - The `journalctl` binary in the container is compatible with your + Systemd/journal version. To get the version of the `journalctl` binary + in Filebeat's image run the following, adjusting the image name/tag + according to the version that you are running: + + + ```sh + docker run --rm -it --entrypoint "journalctl" docker.elastic.co/beats/filebeat: --version + ``` + The container variants that contain `journalctl` are: `filebeat`, + `filebeat-oss` and `filebeat-ubi` - -```sh -docker run --rm -it --entrypoint "journalctl" docker.elastic.co/beats/filebeat-wolfi: --version -``` ::: If the `journalctl` process exits unexpectedly the journald input will terminate with an error and Filebeat will need to be restarted to start reading from the journal again. @@ -110,8 +115,27 @@ input will only ingest the journal files found when started. New files will not be ingested. ::: - - +### `chroot` [filebeat-input-journald-chroot] +```{applies_to} +stack: ga 9.3.0 +``` +A folder to be used as chroot when calling `journalctl`. This allows +Filebeat to call the host's `journalctl` directly. If using this +option in a container, the container needs `CAP_SYS_CHROOT` to start +the chroot and {{filebeat}} needs permissions to read the desired +journals, usually being added to `systemd-journal` group. +Using chroot requires +[`journalct_path`](#filebeat-input-journald-journalctl-path) to be +set. + +### `journalct_path` [filebeat-input-journald-journalctl-path] +```{applies_to} +stack: ga 9.3.0 +``` +The absolute path for the `journalctl` binary. If not set {{filebeat}} +will look for `journalctl` in `PATH`. When using +[`chroot`](#filebeat-input-journald-chroot), `journalct_path` must be +an absolute path from within the chroot directory. ### `merge` [filebeat-input-journald-merge] diff --git a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl index d1d8c1cfbc19..54742a71ee51 100644 --- a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl @@ -821,6 +821,15 @@ filebeat.inputs: #paths: #- /var/log/custom.journal + # Specify a folder to be used as chroot when calling the journalct binary + #chroot: + + # The absolute path for the `journalctl` binary. If not set Filebeat + # will look for `journalctl` in `PATH`. When using + # `chroot`, `journalct_path` must be + # an absolute path from within the chroot directory. + #journalctl_path: + # When enabled, log entries will be ingested interleaved from all # available journals, including remote ones. #merge: false diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index dda556ae2133..ad3f6262fcce 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1234,6 +1234,15 @@ filebeat.inputs: #paths: #- /var/log/custom.journal + # Specify a folder to be used as chroot when calling the journalct binary + #chroot: + + # The absolute path for the `journalctl` binary. If not set Filebeat + # will look for `journalctl` in `PATH`. When using + # `chroot`, `journalct_path` must be + # an absolute path from within the chroot directory. + #journalctl_path: + # When enabled, log entries will be ingested interleaved from all # available journals, including remote ones. #merge: false diff --git a/filebeat/input/journald/config.go b/filebeat/input/journald/config.go index c35e3148fcc8..a129a783b39d 100644 --- a/filebeat/input/journald/config.go +++ b/filebeat/input/journald/config.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +// This file was contributed to by generative AI + //go:build linux package journald @@ -76,6 +78,18 @@ type config struct { // Allow ingesting log entries interleaved from all available journals, // including remote ones. Merge bool `config:"merge"` + + // Chroot is the chroot folder used to call journalctl + Chroot string `config:"chroot"` + + // JournalctlPath specifies the path to the `journalctl` binary. + // This field is required only if the Chroot option is set, as the + // input needs to locate the binary within the chroot environment. + // If Chroot is set, JournalctlPath must be an absolute path within + // the chroot environment. If JournalctlPath is not explicitly set, + // it defaults to `journalctl`, which assumes that the `journalctl` + // binary is available in the system's `PATH` environment variable. + JournalctlPath string `config:"journalctl_path"` } // bwcIncludeMatches is a wrapper that accepts include_matches configuration @@ -107,5 +121,6 @@ func defaultConfig() config { return config{ Seek: journalctl.SeekHead, SaveRemoteHostname: false, + JournalctlPath: "journalctl", } } diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index c7d53e9c4e92..6bcf9ba2c7c4 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -22,6 +22,8 @@ package journald import ( "errors" "fmt" + "os" + "path/filepath" "strconv" "time" @@ -57,8 +59,9 @@ type journald struct { Facilities []int SaveRemoteHostname bool Parsers parser.Config - Journalctl bool Merge bool + Chroot string + JournalctlPath string } type checkpoint struct { @@ -112,6 +115,21 @@ func Configure(cfg *conf.C, _ *logp.Logger) ([]cursor.Source, cursor.Input, erro sources[i] = pathSource(p) } + if config.Chroot != "" { + chrootStat, err := os.Stat(config.Chroot) + if err != nil { + return nil, nil, fmt.Errorf("cannot stat chroot: %w", err) + } + if !chrootStat.IsDir() { + return nil, nil, fmt.Errorf("provided chroot (%s) is not a directory", config.Chroot) + } + + fullPath := filepath.Join(config.Chroot, config.JournalctlPath) + if _, err := os.Stat(fullPath); err != nil { + return nil, nil, fmt.Errorf("cannot stat journalctl binary in chroot: %w", err) + } + } + return sources, &journald{ ID: config.ID, Since: config.Since, @@ -124,6 +142,8 @@ func Configure(cfg *conf.C, _ *logp.Logger) ([]cursor.Source, cursor.Input, erro SaveRemoteHostname: config.SaveRemoteHostname, Parsers: config.Parsers, Merge: config.Merge, + Chroot: config.Chroot, + JournalctlPath: config.JournalctlPath, }, nil } @@ -143,7 +163,7 @@ func (inp *journald) Test(src cursor.Source, ctx input.TestContext) error { inp.Since, src.Name(), inp.Merge, - journalctl.Factory, + journalctl.NewFactory(inp.Chroot, inp.JournalctlPath), ) if err != nil { return err @@ -179,7 +199,7 @@ func (inp *journald) Run( inp.Since, src.Name(), inp.Merge, - journalctl.Factory, + journalctl.NewFactory(inp.Chroot, inp.JournalctlPath), ) if err != nil { wrappedErr := fmt.Errorf("could not start journal reader: %w", err) diff --git a/filebeat/input/journald/pkg/journalctl/chroot_test.go b/filebeat/input/journald/pkg/journalctl/chroot_test.go new file mode 100644 index 000000000000..00ad6e60e651 --- /dev/null +++ b/filebeat/input/journald/pkg/journalctl/chroot_test.go @@ -0,0 +1,251 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This file was contributed to by generative AI + +//go:build linux && integration + +package journalctl + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/mount" + "github.com/docker/docker/client" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/logp/logptest" + "github.com/elastic/elastic-agent-libs/testing/fs" +) + +// TestNewFactoryChroot starts a docker container mounting / as /hostfs and +// the current directory as /workspace. The container runs TestInDockerNewFactory +// that sets the chroot to access the host's journalctl +func TestNewFactoryChroot(t *testing.T) { + containerChroot := "/hostfs" + + // Create Docker client + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + require.NoError(t, err, "failed to create docker client") + defer cli.Close() + + // Find the project root (where go.mod is) + projectRoot := findProjectRoot(t) + imageName := getImageName(t, projectRoot) + + // Pull the image if not present + pullImage(t, cli, imageName) + + // Find the absolute path to journalctl inside the chroot + journalctlPath, err := exec.LookPath("journalctl") + require.NoError(t, err, "cannot look path for journalctl") + + tempDir := fs.TempDir(t, "..", "..", "..", "..", "build") + + // Create container configuration + containerConfig := &container.Config{ + Image: imageName, + Cmd: []string{"go", "test", "-v", "-count=1", "-tags=integration", "-run=TestInDockerNewFactory"}, + Tty: true, + AttachStdin: false, + WorkingDir: "/workspace/filebeat/input/journald/pkg/journalctl", + Env: []string{ + "IN_DOCKER_CONTAINER=true", + fmt.Sprintf("JOURNALCTL_PATH=%s", journalctlPath), + fmt.Sprintf("CHROOT_PATH=%s", containerChroot), + fmt.Sprintf("TEST_TEMP_DIR=%s", filepath.Join(containerChroot, tempDir)), + }, + } + + hostConfig := &container.HostConfig{ + Mounts: []mount.Mount{ + { + Type: mount.TypeBind, + Source: "/", + Target: containerChroot, + }, + { + Type: mount.TypeBind, + Source: projectRoot, + Target: "/workspace", + }, + }, + CapAdd: []string{"CAP_SYS_CHROOT"}, // Required for chroot + AutoRemove: true, + } + + // Create the container + ctx := t.Context() + createResp, err := cli.ContainerCreate(ctx, containerConfig, hostConfig, nil, nil, "") + require.NoError(t, err, "failed to create container") + containerID := createResp.ID + + // Start the container + err = cli.ContainerStart(ctx, containerID, container.StartOptions{}) + require.NoError(t, err, "failed to start container") + attachResp, err := cli.ContainerAttach(ctx, containerID, container.AttachOptions{ + Stream: true, + Stdout: true, + Stderr: true, + }) + require.NoErrorf(t, err, "cannot attach to container: %d", err) + + containerLogs := fs.NewLogFile(t, tempDir, "docker-container-*.log") + go func() { + _, err := io.Copy(containerLogs, attachResp.Reader) + if err != nil { + t.Logf("could not fully copy container logs: %s", err) + } + }() + + waitRespChan, waitErrChan := cli.ContainerWait(ctx, containerID, container.WaitConditionRemoved) + select { + case r := <-waitRespChan: + if r.StatusCode != 0 { + t.Errorf("Test in container failed, returned status: %d.", r.StatusCode) + if r.Error != nil { + t.Logf("ContainerWait response error: %s", r.Error.Message) + } + + logDockerCmd(t, imageName, containerConfig, hostConfig)() + t.Log("Check the docker container logs for more information") + } + case err := <-waitErrChan: + t.Fatalf("error waiting for container to finish: %s", err) + } +} + +func TestInDockerNewFactory(t *testing.T) { + if os.Getenv("IN_DOCKER_CONTAINER") != "true" { + t.Skip("Skipping test - must run inside Docker container with IN_DOCKER_CONTAINER=true") + } + + journalctlPath := os.Getenv("JOURNALCTL_PATH") + require.NotEmpty(t, journalctlPath, "JOURNALCTL_PATH must be set") + + chrootPath := os.Getenv("CHROOT_PATH") + require.NotEmpty(t, chrootPath, "CHROOT_PATH must be set") + + tempDir := os.Getenv("TEST_TEMP_DIR") + require.NotEmpty(t, tempDir, "TEST_TEMP_DIR be set") + + jctlCtx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + logger := logptest.NewFileLogger(t, tempDir) + factory := NewFactory(chrootPath, journalctlPath) + + // Try to read version output, this ensures we can call journalctl + // without the need of any messages in the journal + jctl, err := factory(jctlCtx, logger.Logger, "--version") + require.NoError(t, err, "failed to create journalctl with chroot") + defer jctl.Kill() // nolint: deadcode // It's a test, there is nothing to do + + data, err := jctl.Next(jctlCtx) + require.NoError(t, err, "failed to read from journalctl") + require.NotEmpty(t, data, "expected output from journalctl --version") +} + +func pullImage(t *testing.T, cli *client.Client, imageName string) { + reader, err := cli.ImagePull(t.Context(), imageName, image.PullOptions{}) + require.NoErrorf(t, err, "failed to pull image: %q", imageName) + defer reader.Close() + + // Wait for pull to complete + _, err = io.Copy(io.Discard, reader) + require.NoError(t, err, "failed to read image pull output") +} + +func findProjectRoot(t *testing.T) string { + startDir, err := os.Getwd() + require.NoError(t, err, "failed to get working directory") + + // Add a level so we start looking at the current directory + dir := filepath.Join(startDir, "foo") + for { + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir + } + + parent := filepath.Dir(dir) + if parent == "/" { + // Reached root without finding go.mod + t.Fatal("go.mod not found") + } + + dir = parent + } +} + +func getImageName(t *testing.T, projectRoot string) string { + // Construct the path to the .go-version file + goVersionPath := filepath.Join(projectRoot, ".go-version") + + // Read the contents of the .go-version file + data, err := os.ReadFile(goVersionPath) + require.NoError(t, err, "failed to read .go-version file") + + // Trim leading and trailing spaces from the version string + version := strings.TrimSpace(string(data)) + + imageName := "golang:" + version + "-alpine" + return imageName +} + +func logDockerCmd( + t *testing.T, + imageName string, + containerConfig *container.Config, + hostConfig *container.HostConfig) func() { + return func() { + t.Logf("To reproduce, you can run the following Docker command:") + + // Construct the environment variables + var envVars []string + for _, env := range containerConfig.Env { + envVars = append(envVars, "-e "+env) + } + + // Construct the volume mounts + var volumeMounts []string + for _, m := range hostConfig.Mounts { + mountOption := "-v " + m.Source + ":" + m.Target + volumeMounts = append(volumeMounts, mountOption) + } + + // Construct the docker run command + dockerRunCmd := strings.Join([]string{ + "docker run --rm", + strings.Join(envVars, " "), + strings.Join(volumeMounts, " "), + "-w " + containerConfig.WorkingDir, + imageName, + strings.Join(containerConfig.Cmd, " "), + }, " ") + + t.Log(dockerRunCmd) + } +} diff --git a/filebeat/input/journald/pkg/journalctl/journalctl.go b/filebeat/input/journald/pkg/journalctl/journalctl.go index 47f2078b7075..0f009858e294 100644 --- a/filebeat/input/journald/pkg/journalctl/journalctl.go +++ b/filebeat/input/journald/pkg/journalctl/journalctl.go @@ -28,6 +28,7 @@ import ( "os/exec" "strings" "sync" + "syscall" input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/elastic-agent-libs/logp" @@ -44,119 +45,135 @@ type journalctl struct { waitDone sync.WaitGroup } -// Factory returns an instance of journalctl ready to use. +// NewFactory returns a function that instantiates [journalctl]. // The caller is responsible for calling Kill to ensure the // journalctl process created is correctly terminated. // +// If chroot is non-empty, then journalctlPath must be non-empty and be +// the absolute path to the journalctl binary inside the chroot. +// // The returned type is an interface to allow mocking for testing -func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args ...string) (Jctl, error) { - cmd := exec.Command(binary, args...) - - jctl := journalctl{ - canceler: canceller, - cmd: cmd, - dataChan: make(chan []byte), - logger: logger, - } +func NewFactory(chroot, journalctlPath string) JctlFactory { + return func(canceller input.Canceler, logger *logp.Logger, args ...string) (Jctl, error) { + cmd := exec.Command(journalctlPath, args...) - var err error - jctl.stdout, err = cmd.StdoutPipe() - if err != nil { - return &journalctl{}, fmt.Errorf("cannot get stdout pipe: %w", err) - } - jctl.stderr, err = cmd.StderrPipe() - if err != nil { - return &journalctl{}, fmt.Errorf("cannot get stderr pipe: %w", err) - } - - // This gorroutune reads the stderr from the journalctl process, if the - // process exits for any reason, then its stderr is closed, this goroutine - // gets an EOF error and exits - go func() { - defer jctl.logger.Debug("stderr reader goroutine done") - reader := bufio.NewReader(jctl.stderr) - for { - line, err := reader.ReadString('\n') - if err != nil { - if !errors.Is(err, io.EOF) { - logger.Errorf("cannot read from journalctl stderr: %s", err) - } - return + if chroot != "" { + cmd.SysProcAttr = &syscall.SysProcAttr{ + Chroot: chroot, } + } - logger.Errorf("Journalctl wrote to stderr: %s", line) + jctl := journalctl{ + canceler: canceller, + cmd: cmd, + dataChan: make(chan []byte), + logger: logger, } - }() - - // This goroutine reads the stdout from the journalctl process and makes - // the data available via the `Next()` method. - // If the journalctl process exits for any reason, then its stdout is closed - // this goroutine gets an EOF error and exits. - go func() { - defer jctl.logger.Debug("stdout reader goroutine done") - defer close(jctl.dataChan) - reader := bufio.NewReader(jctl.stdout) - for { - data, err := reader.ReadBytes('\n') - if err != nil { - if !errors.Is(err, io.EOF) { - var logError = false - var pathError *fs.PathError - if errors.As(err, &pathError) { - // Because we're reading from the stdout from a process that will - // eventually exit, it can happen that when reading we get the - // fs.PathError below instead of an io.EOF. This is expected, - // it only means the process has exited, its stdout has been - // closed and there is nothing else for us to read. - // This is expected and does not cause any data loss. - // So we log at level debug to have it in our logs if ever needed - // while avoiding adding error level logs on user's deployments - // for situations that are well handled. - if pathError.Op == "read" && - pathError.Path == "|0" && - pathError.Err.Error() == "file already closed" { - logger.Debugf("cannot read from journalctl stdout: '%s'", err) + + var err error + jctl.stdout, err = cmd.StdoutPipe() + if err != nil { + return &journalctl{}, fmt.Errorf("cannot get stdout pipe: %w", err) + } + jctl.stderr, err = cmd.StderrPipe() + if err != nil { + return &journalctl{}, fmt.Errorf("cannot get stderr pipe: %w", err) + } + + // This gorroutune reads the stderr from the journalctl process, if the + // process exits for any reason, then its stderr is closed, this goroutine + // gets an EOF error and exits + go func() { + defer jctl.logger.Debug("stderr reader goroutine done") + reader := bufio.NewReader(jctl.stderr) + for { + line, err := reader.ReadString('\n') + if err != nil { + if !errors.Is(err, io.EOF) { + logger.Errorf("cannot read from journalctl stderr: %s", err) + } + return + } + + logger.Errorf("Journalctl wrote to stderr: %s", line) + } + }() + + // This goroutine reads the stdout from the journalctl process and makes + // the data available via the `Next()` method. + // If the journalctl process exits for any reason, then its stdout is closed + // this goroutine gets an EOF error and exits. + go func() { + defer jctl.logger.Debug("stdout reader goroutine done") + defer close(jctl.dataChan) + reader := bufio.NewReader(jctl.stdout) + for { + data, err := reader.ReadBytes('\n') + if err != nil { + if !errors.Is(err, io.EOF) { + var logError = false + var pathError *fs.PathError + if errors.As(err, &pathError) { + // Because we're reading from the stdout from a process that will + // eventually exit, it can happen that when reading we get the + // fs.PathError below instead of an io.EOF. This is expected, + // it only means the process has exited, its stdout has been + // closed and there is nothing else for us to read. + // This is expected and does not cause any data loss. + // So we log at level debug to have it in our logs if ever needed + // while avoiding adding error level logs on user's deployments + // for situations that are well handled. + if pathError.Op == "read" && + pathError.Path == "|0" && + pathError.Err.Error() == "file already closed" { + logger.Debugf("cannot read from journalctl stdout: '%s'", err) + } else { + logError = true + } } else { logError = true } - } else { - logError = true - } - if logError { - logger.Errorf("cannot read from journalctl stdout: '%s'", err) + if logError { + logger.Errorf("cannot read from journalctl stdout: '%s'", err) + } } + return } - return - } - select { - case <-jctl.canceler.Done(): - return - case jctl.dataChan <- data: + select { + case <-jctl.canceler.Done(): + return + case jctl.dataChan <- data: + } } + }() + + jctlCommandMsg := fmt.Sprintf("Journalctl command: %s %s", journalctlPath, strings.Join(args, " ")) + if chroot != "" { + jctlCommandMsg = fmt.Sprintf("%s Chroot: %s (command path relative to chroot)", jctlCommandMsg, chroot) } - }() - logger.Infof("Journalctl command: journalctl %s", strings.Join(args, " ")) + logger.Info(jctlCommandMsg) - if err := cmd.Start(); err != nil { - return &journalctl{}, fmt.Errorf("cannot start journalctl: %w", err) - } + if err := cmd.Start(); err != nil { + return &journalctl{}, fmt.Errorf("cannot start journalctl: %w. Chroot: %s", err, chroot) + } - logger.Infof("journalctl started with PID %d", cmd.Process.Pid) + logger.Infof("journalctl started with PID %d", cmd.Process.Pid) - // Whenever the journalctl process exits, the `Wait` call returns, - // if there was an error it is logged and this goroutine exits. - jctl.waitDone.Add(1) - go func() { - defer jctl.waitDone.Done() - if err := cmd.Wait(); err != nil { - jctl.logger.Errorf("journalctl exited with an error, exit code %d ", cmd.ProcessState.ExitCode()) - } - jctl.logger.Debugf("journalctl exit code: %d", cmd.ProcessState.ExitCode()) - }() + // Whenever the journalctl process exits, the `Wait` call returns, + // if there was an error it is logged and this goroutine exits. + jctl.waitDone.Add(1) + go func() { + defer jctl.waitDone.Done() + if err := cmd.Wait(); err != nil { + jctl.logger.Errorf("journalctl exited with an error, exit code %d ", cmd.ProcessState.ExitCode()) + } + jctl.logger.Debugf("journalctl exit code: %d", cmd.ProcessState.ExitCode()) + }() - return &jctl, nil + return &jctl, nil + } } // Kill Terminates the journalctl process using a SIGKILL. diff --git a/filebeat/input/journald/pkg/journalctl/reader.go b/filebeat/input/journald/pkg/journalctl/reader.go index 958c271f10fd..6f6d5dfa0076 100644 --- a/filebeat/input/journald/pkg/journalctl/reader.go +++ b/filebeat/input/journald/pkg/journalctl/reader.go @@ -55,7 +55,7 @@ type JournalEntry struct { // JctlFactory is a function that returns an instance of journalctl ready to use. // It exists to allow testing -type JctlFactory func(canceller input.Canceler, logger *logp.Logger, binary string, args ...string) (Jctl, error) +type JctlFactory func(canceller input.Canceler, logger *logp.Logger, args ...string) (Jctl, error) // Jctl abstracts the call to journalctl, it exists only for testing purposes // @@ -240,7 +240,7 @@ func New( func (r *Reader) newJctl(extraArgs ...string) error { args := append(r.args, extraArgs...) - jctl, err := r.jctlFactory(r.canceler, r.jctlLogger, "journalctl", args...) + jctl, err := r.jctlFactory(r.canceler, r.jctlLogger, args...) r.jctl = jctl return err diff --git a/filebeat/input/journald/pkg/journalctl/reader_test.go b/filebeat/input/journald/pkg/journalctl/reader_test.go index b34ebd73e556..e7ec8a11e16c 100644 --- a/filebeat/input/journald/pkg/journalctl/reader_test.go +++ b/filebeat/input/journald/pkg/journalctl/reader_test.go @@ -83,7 +83,7 @@ func TestRestartsJournalctlOnError(t *testing.T) { } factoryCalls := atomic.Uint32{} - factory := func(canceller input.Canceler, logger *logp.Logger, binary string, args ...string) (Jctl, error) { + factory := func(canceller input.Canceler, logger *logp.Logger, args ...string) (Jctl, error) { factoryCalls.Add(1) // Add a log to make debugging easier and better mimic the behaviour of the real factory/journalctl logger.Debugf("starting new mock journalclt ID: %d", factoryCalls.Load()) @@ -162,7 +162,7 @@ func TestRestartsJournalctlOnError(t *testing.T) { } func TestNewUsesMergeFlag(t *testing.T) { - f := func(_ input.Canceler, _ *logp.Logger, _ string, s ...string) (Jctl, error) { + f := func(_ input.Canceler, _ *logp.Logger, s ...string) (Jctl, error) { return nil, nil } r, err := New( diff --git a/filebeat/tests/integration/chroot_test.go b/filebeat/tests/integration/chroot_test.go new file mode 100644 index 000000000000..46b9595844bc --- /dev/null +++ b/filebeat/tests/integration/chroot_test.go @@ -0,0 +1,248 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This file was contributed to by generative AI + +//go:build integration && linux + +package integration + +import ( + "archive/tar" + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/docker/docker/api/types/build" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" + "github.com/docker/docker/client" + "github.com/gofrs/uuid/v5" + + "github.com/elastic/elastic-agent-libs/testing/fs" +) + +func TestJournaldChroot(t *testing.T) { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + t.Fatalf("Failed to create Docker client: %s", err) + } + defer cli.Close() + + imageName := "journald-chroot" + syslogID := uuid.Must(uuid.NewV4()).String() + + generateJournaldLogs(t, syslogID, 5, 100) + + tempDir := fs.TempDir(t, filepath.Join("..", "..", "build")) + containerLogFile := fs.NewLogFile(t, tempDir, "container-logs-*.log") + + filebeatPath := buildFilebeatBinary(t, tempDir) + buildDockerImage(t, cli, imageName, tempDir, filebeatPath) + startDockerContainer(t, cli, imageName, syslogID, containerLogFile) + assertJournalctlWorks(t, containerLogFile, syslogID) +} + +func buildDockerImage(t *testing.T, cli *client.Client, imageName, tempDir, filebeatPath string) { + buildContextDir := "testdata/journald_chroot" + + buildOptions := build.ImageBuildOptions{ + Tags: []string{imageName}, + Dockerfile: "Dockerfile", // Dockerfile path relative to the build context + } + buildContext := createDockerContext(t, buildContextDir, filebeatPath) + resp, err := cli.ImageBuild(t.Context(), buildContext, buildOptions) + if err != nil { + t.Fatalf("Failed to build Docker image: %s", err) + } + defer resp.Body.Close() + + // Keep the logs in case something goes wrong + f := fs.NewLogFile(t, tempDir, "docker-build-log-*.log") + if _, err := io.Copy(f, resp.Body); err != nil { + t.Logf("cannot read Docker build logs: %s", err) + } +} + +func buildFilebeatBinary(t *testing.T, tempDir string) string { + filebeatPath := filepath.Join(tempDir, "filebeat_static") + cmd := exec.Command( + "go", + "build", + "-ldflags", + "-extldflags \"-static\" -s", + "-tags", + "timetzdata", + "-o", + filebeatPath, + "../../") + cmd.Env = append(os.Environ(), "CGO_ENABLED=0") + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("Failed to build Filebeat binary: %s\nOutput: %s", err, output) + } + + return filebeatPath +} + +func startDockerContainer(t *testing.T, cli *client.Client, imageName, syslogID string, logFile *fs.LogFile) string { + ctx := t.Context() + + containerConfig := &container.Config{ + Image: imageName, + Tty: true, + Env: []string{ + "SYSLOG_ID=" + syslogID, + }, + } + hostConfig := &container.HostConfig{ + Mounts: []mount.Mount{ + { + Type: mount.TypeBind, + Source: "/", + Target: "/hostfs", + }, + }, + CapAdd: []string{"CAP_SYS_CHROOT"}, // Required for chroot + AutoRemove: true, + } + + resp, err := cli.ContainerCreate(ctx, containerConfig, hostConfig, nil, nil, "") + if err != nil { + t.Fatalf("Failed to create Docker container: %s", err) + } + + // Attach to the container's logs + attachResp, err := cli.ContainerAttach(ctx, resp.ID, container.AttachOptions{ + Stream: true, + Stdout: true, + Stderr: true, + }) + if err != nil { + t.Fatalf("Failed to attach to Docker container logs: %s", err) + } + + // Stream logs to the log file + go func() { + defer attachResp.Close() + if _, err := io.Copy(logFile, attachResp.Reader); err != nil { + t.Logf("Error streaming container logs: %s", err) + } + }() + + if err := cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { + t.Fatalf("Failed to start Docker container: %s", err) + } + + t.Cleanup(func() { + // By the time t.Cleanup runs the test context is already cancelled + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := cli.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil { + t.Logf("Failed to stop container %s: %s\n", resp.ID, err) + } + }) + + return resp.ID +} + +func assertJournalctlWorks(t *testing.T, logFile *fs.LogFile, syslogID string) { + t.Helper() + // Wait for the log message "journalctl started with PID XX" + logFile.WaitLogsContains(t, "journalctl started with PID", 30*time.Second, "journalctl did not start") + for range 5 { + logFile.WaitLogsContains(t, syslogID, 5*time.Second, "did not find event") + } +} + +func createDockerContext(t *testing.T, dir, filebeatPath string) io.Reader { + buf := new(bytes.Buffer) + tw := tar.NewWriter(buf) + defer tw.Close() + + // Add the directory contents to the tar archive + err := filepath.Walk(dir, func(file string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + + header, err := tar.FileInfoHeader(fi, file) + if err != nil { + return fmt.Errorf("cannot get FileInfoHeader for %q: %w", file, err) + } + + header.Name, _ = filepath.Rel(dir, file) + if fi.IsDir() { + header.Name += "/" + } + + if err := tw.WriteHeader(header); err != nil { + return err + } + + if !fi.IsDir() { + f, err := os.Open(file) + if err != nil { + return fmt.Errorf("cannot open file: %w", err) + } + defer f.Close() + + if _, err := io.Copy(tw, f); err != nil { + return fmt.Errorf("cannot read %q: %s", file, err) + } + } + + return nil + }) + if err != nil { + t.Fatalf("cannot generate tar archive: %s", err) + } + + // Add the Filebeat binary to the tar archive + filebeatFile, err := os.Open(filebeatPath) + if err != nil { + t.Fatalf("failed to open Filebeat binary: %s", err) + } + defer filebeatFile.Close() + + fileInfo, err := filebeatFile.Stat() + if err != nil { + t.Fatalf("failed to stat Filebeat binary: %s", err) + } + + header, err := tar.FileInfoHeader(fileInfo, filebeatPath) + if err != nil { + t.Fatalf("failed to create tar header for Filebeat binary: %s", err) + } + header.Name = "filebeat" // Place the binary in the root of the build context + + if err := tw.WriteHeader(header); err != nil { + t.Fatalf("failed to write tar header for Filebeat binary: %s", err) + } + + if _, err := io.Copy(tw, filebeatFile); err != nil { + t.Fatalf("failed to copy Filebeat binary to tar archive: %s", err) + } + + return buf +} diff --git a/filebeat/tests/integration/testdata/journald_chroot/Dockerfile b/filebeat/tests/integration/testdata/journald_chroot/Dockerfile new file mode 100644 index 000000000000..65d6001be2dc --- /dev/null +++ b/filebeat/tests/integration/testdata/journald_chroot/Dockerfile @@ -0,0 +1,7 @@ +FROM scratch + +COPY alpine-release /etc/alpine-release +COPY filebeat / +COPY filebeat.yml / +ENTRYPOINT ["/filebeat"] +CMD ["-c", "/filebeat.yml", "--strict.perms=false"] diff --git a/filebeat/tests/integration/testdata/journald_chroot/alpine-release b/filebeat/tests/integration/testdata/journald_chroot/alpine-release new file mode 100644 index 000000000000..59b3a909cdd4 --- /dev/null +++ b/filebeat/tests/integration/testdata/journald_chroot/alpine-release @@ -0,0 +1 @@ +42.0.0 diff --git a/filebeat/tests/integration/testdata/journald_chroot/filebeat.yml b/filebeat/tests/integration/testdata/journald_chroot/filebeat.yml new file mode 100644 index 000000000000..d07b53633f79 --- /dev/null +++ b/filebeat/tests/integration/testdata/journald_chroot/filebeat.yml @@ -0,0 +1,17 @@ +filebeat.inputs: + - type: journald + id: journald-input-id + chroot: /hostfs + journalctl_path: /usr/bin/journalctl + syslog_identifiers: + - ${SYSLOG_ID} + +queue.mem: + flush.timeout: 0s + +output.console: + enabled: true + pretty: false + +logging: + to_stderr: true diff --git a/go.mod b/go.mod index d6100e98a835..57a169612081 100644 --- a/go.mod +++ b/go.mod @@ -120,7 +120,7 @@ require ( go.elastic.co/go-licence-detector v0.7.0 go.etcd.io/bbolt v1.4.0 go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 + go.uber.org/zap v1.27.1 golang.org/x/crypto v0.45.0 golang.org/x/mod v0.29.0 golang.org/x/net v0.47.0 @@ -172,7 +172,7 @@ require ( github.com/elastic/bayeux v1.0.5 github.com/elastic/ebpfevents v0.8.0 github.com/elastic/elastic-agent-autodiscover v0.10.0 - github.com/elastic/elastic-agent-libs v0.26.2 + github.com/elastic/elastic-agent-libs v0.28.0 github.com/elastic/elastic-agent-system-metrics v0.13.4 github.com/elastic/go-elasticsearch/v8 v8.19.0 github.com/elastic/go-freelru v0.16.0 diff --git a/go.sum b/go.sum index e93585742a1e..6d51948d8be9 100644 --- a/go.sum +++ b/go.sum @@ -374,8 +374,8 @@ github.com/elastic/elastic-agent-autodiscover v0.10.0 h1:WJ4zl9uSfk1kHmn2B/0byQB github.com/elastic/elastic-agent-autodiscover v0.10.0/go.mod h1:Nf3zh9FcJ9nTTswTwDTUAqXmvQllOrNliM6xmORSxwE= github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE= github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= -github.com/elastic/elastic-agent-libs v0.26.2 h1:zwytPWmTWSJG80oa9/5FJ6zue47ysI23eMo15LfeWy0= -github.com/elastic/elastic-agent-libs v0.26.2/go.mod h1:fc2noLqosmQorIGbatJfVeh4CL77yiP8ot16/5umeoM= +github.com/elastic/elastic-agent-libs v0.28.0 h1:UDL9aSxgjqC9TrHAEHgI8gtuhRYPM/1gSfh7ztHWWLw= +github.com/elastic/elastic-agent-libs v0.28.0/go.mod h1:0xUg7alsNE/WhY9DZRIdTYW75nqSHC1octIAg//j/PQ= github.com/elastic/elastic-agent-system-metrics v0.13.4 h1:gX8VdlQyakPcPKFpD7uHv2QLRDyguuKfZgu0LE27V7c= github.com/elastic/elastic-agent-system-metrics v0.13.4/go.mod h1:lB8veYWYBlA9eF6TahmPN87G1IEgWlbep7QSqLSW90U= github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+PeIOod2rY3GVMGoVE= @@ -1238,8 +1238,8 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8 go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= -go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= -go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= diff --git a/libbeat/common/seccomp/policy_linux_386.go b/libbeat/common/seccomp/policy_linux_386.go index 7f52a404aae6..bc462e84e176 100644 --- a/libbeat/common/seccomp/policy_linux_386.go +++ b/libbeat/common/seccomp/policy_linux_386.go @@ -34,6 +34,7 @@ func init() { "capget", "chmod", "chown", + "chroot", "clock_gettime", "clock_nanosleep", "clone", diff --git a/libbeat/common/seccomp/policy_linux_amd64.go b/libbeat/common/seccomp/policy_linux_amd64.go index 55ff70478f8b..87d3bc20c3fc 100644 --- a/libbeat/common/seccomp/policy_linux_amd64.go +++ b/libbeat/common/seccomp/policy_linux_amd64.go @@ -37,6 +37,7 @@ func init() { "capget", "chmod", "chown", + "chroot", "clock_gettime", "clock_nanosleep", "clone", diff --git a/libbeat/common/seccomp/seccomp-profiler-allow.txt b/libbeat/common/seccomp/seccomp-profiler-allow.txt index 425c6160197e..274b1e3cea3e 100644 --- a/libbeat/common/seccomp/seccomp-profiler-allow.txt +++ b/libbeat/common/seccomp/seccomp-profiler-allow.txt @@ -31,6 +31,7 @@ wait4 execve # Jounrald input +chroot dup3 faccessat2 prctl diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 7a2db14b1290..5fa6f2d5963e 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -2918,6 +2918,15 @@ filebeat.inputs: #paths: #- /var/log/custom.journal + # Specify a folder to be used as chroot when calling the journalct binary + #chroot: + + # The absolute path for the `journalctl` binary. If not set Filebeat + # will look for `journalctl` in `PATH`. When using + # `chroot`, `journalct_path` must be + # an absolute path from within the chroot directory. + #journalctl_path: + # When enabled, log entries will be ingested interleaved from all # available journals, including remote ones. #merge: false