Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions changelog/fragments/1765529609-ingesting-GZIP-files-is-GA.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: "GZIP support is GA and always enabled on filestream"

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Ingesting GZIP-compressed files is now GA. The `gzip_experimental` configuration option has been deprecated. Users should use `compression` instead. Refer to the [documentation](https://www.elastic.co/docs/reference/beats/filebeat/filebeat-input-filestream#reading-gzip-files) for more details.

component: filebeat

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/beats/pull/47893

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/beats/issues/47880
49 changes: 36 additions & 13 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

// Compression mode constants
const (
// CompressionNone disables compression handling; all files are treated as
// plain text.
CompressionNone = ""
// CompressionGZIP treats all files as gzip compressed.
CompressionGZIP = "gzip"
// CompressionAuto auto-detects gzip files and decompresses them.
CompressionAuto = "auto"
)

// config stores the options of a file stream.
type config struct {
Reader readerConfig `config:",inline"`
Expand All @@ -44,10 +55,14 @@ type config struct {
FileWatcher fileWatcherConfig `config:"prospector.scanner"`
FileIdentity *conf.Namespace `config:"file_identity"`

// GZIPExperimental enables beta support for ingesting GZIP files.
// When set to true the input will transparently stream-decompress GZIP files.
// This feature is experimental and subject to change.
GZIPExperimental bool `config:"gzip_experimental"`
// Compression specifies how file compression is handled.
// Valid values: "" (none), "gzip" (all files are gzip), "auto"
// (auto-detect).
Compression string `config:"compression"`

// GZIPExperimental is deprecated and is ignored. Use Compression instead.
// Deprecated.
GZIPExperimental *bool `config:"gzip_experimental"`

// Whether to add the file owner name and group to the event metadata.
// Disabled by default.
Expand Down Expand Up @@ -214,12 +229,12 @@ func (c *config) Validate() error {
}
}

if c.GZIPExperimental {
// Validate file_identity must be fingerprint when gzip support is enabled.
if c.FileIdentity != nil && c.FileIdentity.Name() != fingerprintName {
return fmt.Errorf(
"gzip_experimental=true requires file_identity to be 'fingerprint'")
}
switch c.Compression {
case CompressionNone, CompressionGZIP, CompressionAuto:
// valid values
default:
return fmt.Errorf("invalid compression value %q, must be one of: %q, %q, %q",
c.Compression, CompressionNone, CompressionGZIP, CompressionAuto)
}

if c.ID == "" && c.TakeOver.Enabled {
Expand All @@ -237,9 +252,17 @@ func (c config) checkUnsupportedParams(logger *logp.Logger) {
"duplication and incomplete input metrics, it's use is " +
"highly discouraged.")
}
if c.GZIPExperimental {
logger.Named("filestream").Warn(cfgwarn.Beta(
"filestream: beta gzip support enabled"))
if c.GZIPExperimental != nil {
logger.Named("filestream").Warn(cfgwarn.Deprecate(
"",
"'gzip_experimental' is deprecated and ignored, set 'compression' instead"))
}

// file_identity should be fingerprint when compression is enabled.
if (c.Compression == CompressionGZIP || c.Compression == CompressionAuto) &&
(c.FileIdentity != nil && c.FileIdentity.Name() != fingerprintName) {
logger.Warnf(
"compression='%s' requires file_identity to be 'fingerprint'", c.Compression)
}
}

Expand Down
53 changes: 25 additions & 28 deletions filebeat/input/filestream/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,35 +68,32 @@ func TestConfigValidate(t *testing.T) {
assert.Error(t, err)
})

t.Run("gzip_experimental works with file_identity.fingerprint", func(t *testing.T) {
c, err := conf.NewConfigFrom(`
id: 'some id'
paths: [/foo/bar*]
gzip_experimental: true
file_identity.fingerprint: ~
`)
require.NoError(t, err, "could not create config from string")
got := defaultConfig()
err = c.Unpack(&got)
require.NoError(t, err, "could not unpack config")

err = got.Validate()
assert.NoError(t, err)
})
t.Run("compression validation", func(t *testing.T) {
tcs := []struct {
name string
compression string
wantErr string
}{
{name: "none is valid", compression: CompressionNone},
{name: "gzip is valid", compression: CompressionGZIP},
{name: "auto is valid", compression: CompressionAuto},
{name: "invalid value returns error", compression: "invalid", wantErr: `invalid compression value "invalid"`},
}

t.Run("gzip_experimental requires file_identity.fingerprint", func(t *testing.T) {
c, err := conf.NewConfigFrom(`
id: 'some id'
paths: [/foo/bar*]
gzip_experimental: true
file_identity.path: ~
`)
require.NoError(t, err, "could not create config from string")
got := defaultConfig()
err = c.Unpack(&got)
assert.ErrorContains(t,
err,
"gzip_experimental=true requires file_identity to be 'fingerprint")
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
c := config{
Paths: []string{"/foo/bar"},
Compression: tc.compression,
}
err := c.Validate()
if tc.wantErr == "" {
assert.NoError(t, err)
} else {
assert.ErrorContains(t, err, tc.wantErr)
}
})
}
})
}

Expand Down
8 changes: 4 additions & 4 deletions filebeat/input/filestream/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func TestLogFileTimedClosing(t *testing.T) {

for _, tc := range testCases {
fs := filestream{
readerConfig: readerConfig{BufferSize: 512},
gzipExperimental: true}
readerConfig: readerConfig{BufferSize: 512},
compression: CompressionAuto}
f, err := fs.newFile(tc.createFile(t))
require.NoError(t, err,
"could not create file for reading")
Expand Down Expand Up @@ -151,8 +151,8 @@ func TestLogFileTruncated(t *testing.T) {
osFile := tc.createFile(t)

fs := filestream{
readerConfig: readerConfig{BufferSize: 512},
gzipExperimental: true}
readerConfig: readerConfig{BufferSize: 512},
compression: CompressionAuto}

f, err := fs.newFile(osFile)
require.NoError(t, err, "could not create file for reading")
Expand Down
17 changes: 11 additions & 6 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ func newFileWatcher(
logger *logp.Logger,
paths []string,
config fileWatcherConfig,
gzipAllowed bool,
compression string,
sendNotChanged bool,
fi fileIdentifier,
srci *loginp.SourceIdentifier,
) (*fileWatcher, error) {

config.SendNotChanged = sendNotChanged
scanner, err := newFileScanner(logger, paths, config.Scanner, gzipAllowed)
scanner, err := newFileScanner(logger, paths, config.Scanner, compression)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -401,16 +401,16 @@ type fileScanner struct {
log *logp.Logger
hasher hash.Hash
readBuffer []byte
gzipAllowed bool
compression string
}

func newFileScanner(logger *logp.Logger, paths []string, config fileScannerConfig, gzipAllowed bool) (*fileScanner, error) {
func newFileScanner(logger *logp.Logger, paths []string, config fileScannerConfig, compression string) (*fileScanner, error) {
s := fileScanner{
paths: paths,
cfg: config,
log: logger.Named(scannerDebugKey),
hasher: sha256.New(),
gzipAllowed: gzipAllowed,
compression: compression,
}

if s.cfg.Fingerprint.Enabled {
Expand Down Expand Up @@ -620,7 +620,12 @@ func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescript
}
defer osFile.Close()

if s.gzipAllowed {
switch s.compression {
case CompressionNone:
// fd.GZIP stays false
case CompressionGZIP:
fd.GZIP = true
case CompressionAuto:
fd.GZIP, err = IsGZIP(osFile)
if err != nil {
return fd, fmt.Errorf("failed to check if %q is gzip: %w",
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/fswatch_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

func TestFileWatcherNotifications(t *testing.T) {
testCases := map[string]func(t *testing.T, fw *fileWatcher, evt loginp.FSEvent, dir, logFilePath string){
"Partially ingested file": func(t *testing.T, fw *fileWatcher, evt loginp.FSEvent, dir, logFilePath string) {

Check failure on line 34 in filebeat/input/filestream/fswatch_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA4009: argument evt is overwritten before first use (staticcheck)
// Tests the case:
// - watch runs and sees a new file, it sends a create event
// - data is added to the file
Expand All @@ -43,7 +43,7 @@
// Write to the file, so we get a write operation
integration.WriteLogFile(t, logFilePath, 10, true)
fw.watch(t.Context())
evt = <-fw.events

Check failure on line 46 in filebeat/input/filestream/fswatch_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA4009(related information): assignment to evt (staticcheck)
requireOperation(t, evt, loginp.OpWrite)

// Check the filewatcher state
Expand Down Expand Up @@ -187,7 +187,7 @@
logptest.NewFileLogger(t, filepath.Join(dir, "logger")).Logger,
[]string{filepath.Join(dir, "*.log")},
cfg,
false,
CompressionNone,
false,
mustFingerprintIdentifier(),
mustSourceIdentifier("foo-id"),
Expand Down
30 changes: 15 additions & 15 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,10 @@ func TestFileScanner(t *testing.T) {
require.NoError(t, err)

cases := []struct {
name string
cfgStr string
gzip bool
expDesc map[string]loginp.FileDescriptor
name string
cfgStr string
compression string
expDesc map[string]loginp.FileDescriptor
}{
{
name: "returns all files when no limits, not including the repeated symlink",
Expand Down Expand Up @@ -823,8 +823,8 @@ scanner:
},
},
{
name: "returns all files except too small to fingerprint",
gzip: true,
name: "returns all files except too small to fingerprint",
compression: CompressionAuto,
cfgStr: `
scanner:
symlinks: true
Expand Down Expand Up @@ -929,7 +929,7 @@ scanner:
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
logger := logptest.NewTestingLogger(t, "")
s := createScannerWithConfig(t, logger, paths, tc.cfgStr, tc.gzip)
s := createScannerWithConfig(t, logger, paths, tc.cfgStr, tc.compression)
requireEqualFiles(t, tc.expDesc, s.GetFiles())
})
}
Expand All @@ -946,7 +946,7 @@ scanner:

// the glob for the very small files
paths := []string{filepath.Join(dir, undersizedGlob)}
s := createScannerWithConfig(t, logger, paths, cfgStr, false)
s := createScannerWithConfig(t, logger, paths, cfgStr, CompressionNone)
files := s.GetFiles()
require.Empty(t, files)

Expand Down Expand Up @@ -995,7 +995,7 @@ scanner:
logptest.NewTestingLogger(t, ""),
paths,
cfg,
false,
CompressionNone,
false,
mustPathIdentifier(false),
mustSourceIdentifier("foo-id"),
Expand Down Expand Up @@ -1039,7 +1039,7 @@ func BenchmarkGetFiles(b *testing.B) {
Enabled: false,
},
}
s, err := newFileScanner(logp.NewNopLogger(), paths, cfg, false)
s, err := newFileScanner(logp.NewNopLogger(), paths, cfg, CompressionNone)
require.NoError(b, err)

for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -1067,7 +1067,7 @@ func BenchmarkGetFilesWithFingerprint(b *testing.B) {
},
}

s, err := newFileScanner(logp.NewNopLogger(), paths, cfg, false)
s, err := newFileScanner(logp.NewNopLogger(), paths, cfg, CompressionNone)
require.NoError(b, err)

for i := 0; i < b.N; i++ {
Expand All @@ -1092,7 +1092,7 @@ func createWatcherWithConfig(t *testing.T, logger *logp.Logger, paths []string,
logger,
paths,
tmpCfg.Scaner,
false,
CompressionNone,
false,
mustPathIdentifier(false),
mustSourceIdentifier("foo-id"),
Expand All @@ -1102,7 +1102,7 @@ func createWatcherWithConfig(t *testing.T, logger *logp.Logger, paths []string,
return fw
}

func createScannerWithConfig(t *testing.T, logger *logp.Logger, paths []string, cfgStr string, gzipAllowed bool) loginp.FSScanner {
func createScannerWithConfig(t *testing.T, logger *logp.Logger, paths []string, cfgStr string, compression string) loginp.FSScanner {
cfg, err := conf.NewConfigWithYAML([]byte(cfgStr), cfgStr)
require.NoError(t, err)

Expand All @@ -1113,7 +1113,7 @@ func createScannerWithConfig(t *testing.T, logger *logp.Logger, paths []string,
config := defaultFileWatcherConfig()
err = ns.Config().Unpack(&config)
require.NoError(t, err)
scanner, err := newFileScanner(logger, paths, config.Scanner, gzipAllowed)
scanner, err := newFileScanner(logger, paths, config.Scanner, compression)
require.NoError(t, err)

return scanner
Expand Down Expand Up @@ -1170,7 +1170,7 @@ func BenchmarkToFileDescriptor(b *testing.B) {
},
}

s, err := newFileScanner(logp.NewNopLogger(), paths, cfg, false)
s, err := newFileScanner(logp.NewNopLogger(), paths, cfg, CompressionNone)
require.NoError(b, err)

it, err := s.getIngestTarget(filename)
Expand Down
Loading
Loading