Skip to content

Commit 249dd71

Browse files
committed
POC: filestream growing fingerprint identity
Add a proof-of-concept for a new "growing_fingerprint" file identity mode that addresses the limitation where files smaller than the fingerprint size (default 1024 bytes) cannot be tracked. ### Key changes: - Add growingFingerprintIdentifier that stores raw bytes (hex-encoded) instead of a hash, allowing the fingerprint to grow as the file grows - Files can be tracked immediately regardless of size (no minimum threshold) - Implement prefix matching: when a file grows, match the old (shorter) fingerprint as a prefix of the new (longer) fingerprint - Add IterateOnPrefix() and UpdateKey() to the store for registry key migration - Support in-place key updates without interrupting running harvesters - Default max_length is 1000 bytes (matching OTEL's filelog receiver) Configuration: ``` prospector.scanner: fingerprint.growing: true fingerprint.max_length: 1000 # optional, default 1000 file_identity.growing_fingerprint: ~ ``` This enables tracking small files that share initial content (e.g., common headers) by allowing their fingerprints to diverge as they grow with unique content. ### Includes integration tests covering: - Small files tracked immediately - Files with identical initial content differentiated as they grow - Fingerprint migration on file growth - Restart scenarios ### Open question/issue Using the raw bytes (hex-encoded) as the fingerprint makes it easier to compare prefix matches, however increases the memory consumption, up to 1000 bytes per file per in-memory instance of the fingerprint. Also, it increases the storage used by the registry on disk.
1 parent 061db97 commit 249dd71

File tree

12 files changed

+1245
-31
lines changed

12 files changed

+1245
-31
lines changed

filebeat/input/filestream/copytruncate_prospector.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,6 @@ func (p *copyTruncateFileProspector) onFSEvent(
264264
// check if the event belongs to a rotated file
265265
if p.isRotated(event) {
266266
log.Debugf("File %s is rotated", event.NewPath)
267-
268267
p.onRotatedFile(log, ctx, event, src, group)
269268

270269
} else {

filebeat/input/filestream/fswatch.go

Lines changed: 119 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"io"
2727
"os"
2828
"path/filepath"
29+
"strings"
2930
"sync"
3031
"time"
3132

@@ -39,10 +40,11 @@ import (
3940
)
4041

4142
const (
42-
RecursiveGlobDepth = 8
43-
DefaultFingerprintSize int64 = 1024 // 1KB
44-
scannerDebugKey = "scanner"
45-
watcherDebugKey = "file_watcher"
43+
RecursiveGlobDepth = 8
44+
DefaultFingerprintSize int64 = 1024 // 1KB
45+
DefaultGrowingFingerprintSize int64 = 1000 // Same as OTEL's filelog receiver
46+
scannerDebugKey = "scanner"
47+
watcherDebugKey = "file_watcher"
4648
)
4749

4850
var (
@@ -167,9 +169,20 @@ func (w *fileWatcher) processNotification(evt loginp.HarvesterStatus) {
167169
w.closedHarvestersMutex.Unlock()
168170
}
169171

172+
// AndersonQ: fsEvents created here.
173+
// here when f1 grwos -> same file / written
174+
//
175+
// f2 same "old" f1 fingerprint -> new file
170176
func (w *fileWatcher) watch(ctx unison.Canceler) {
171177
w.log.Debug("Start next scan")
172178

179+
prevKeys := make([]string, 0, len(w.prev))
180+
for k := range w.prev {
181+
prevKeys = append(prevKeys, k)
182+
}
183+
w.log.Debugf("Start next scan: prevs: %s", strings.Join(prevKeys, ","))
184+
185+
// file identity is updated in GetFiles
173186
paths := w.scanner.GetFiles()
174187

175188
// for debugging purposes
@@ -189,9 +202,17 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
189202

190203
// if the scanner found a new path or an existing path
191204
// with a different file, it is a new file
205+
// fileDescriptor key is the path
192206
prevDesc, ok := w.prev[path]
193207
sfd := fd // to avoid memory aliasing
194-
if !ok || !loginp.SameFile(&prevDesc, &sfd) {
208+
// AndersonQ: new, longer fingerprint makes it looks like a new file. It
209+
// has to remain the same file
210+
// If growing fingerprint is enabled, just the ID match isn't enough.
211+
if !ok || !loginp.SameFile(w.log, &prevDesc, &sfd) {
212+
if ok {
213+
w.log.Infof("file %q has been replaced by a new file. Old ID %q, new ID %q",
214+
path, prevDesc.FileID(), fd.FileID())
215+
}
195216
newFilesByName[path] = &sfd
196217
newFilesByID[fd.FileID()] = &sfd
197218
continue
@@ -244,6 +265,8 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
244265
// If a harvester for this file was closed recently,
245266
// we use its state instead of the one we have cached.
246267
case prevDesc.SizeOrBytesIngested() < fd.Info.Size():
268+
// AndersonQ: here, file with new/longer fingerprint has ti be
269+
// treated as the same file.
247270
e = writeEvent(path, fd, srcID)
248271
writtenCount++
249272

@@ -300,6 +323,8 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
300323
}
301324

302325
// remaining files in newFiles are newly created files
326+
// AndersonQ: when a file grows, it isn't a new file. Even though it has a
327+
// new ID
303328
for path, fd := range newFilesByName {
304329
// no need to react on empty new files
305330
if fd.Info.Size() == 0 {
@@ -371,6 +396,12 @@ type fingerprintConfig struct {
371396
Enabled bool `config:"enabled"`
372397
Offset int64 `config:"offset"`
373398
Length int64 `config:"length"`
399+
// Growing enables the growing fingerprint mode where the fingerprint
400+
// is the raw bytes (not a hash) and can grow as the file grows.
401+
Growing bool `config:"growing"`
402+
// MaxLength is the maximum number of bytes to use for the growing fingerprint.
403+
// Default is 1000 bytes (same as OTEL's filelog receiver).
404+
MaxLength int64 `config:"max_length"`
374405
}
375406

376407
type fileScannerConfig struct {
@@ -386,22 +417,25 @@ func defaultFileScannerConfig() fileScannerConfig {
386417
Symlinks: false,
387418
RecursiveGlob: true,
388419
Fingerprint: fingerprintConfig{
389-
Enabled: true,
390-
Offset: 0,
391-
Length: DefaultFingerprintSize,
420+
Enabled: true,
421+
Offset: 0,
422+
Length: DefaultFingerprintSize,
423+
Growing: false,
424+
MaxLength: DefaultGrowingFingerprintSize,
392425
},
393426
}
394427
}
395428

396429
// fileScanner looks for files which match the patterns in paths.
397430
// It is able to exclude files and symlinks.
398431
type fileScanner struct {
399-
paths []string
400-
cfg fileScannerConfig
401-
log *logp.Logger
402-
hasher hash.Hash
403-
readBuffer []byte
404-
gzipAllowed bool
432+
paths []string
433+
cfg fileScannerConfig
434+
log *logp.Logger
435+
hasher hash.Hash
436+
readBuffer []byte
437+
growingBuffer []byte // buffer for growing fingerprint mode
438+
gzipAllowed bool
405439
}
406440

407441
func newFileScanner(logger *logp.Logger, paths []string, config fileScannerConfig, gzipAllowed bool) (*fileScanner, error) {
@@ -413,7 +447,11 @@ func newFileScanner(logger *logp.Logger, paths []string, config fileScannerConfi
413447
gzipAllowed: gzipAllowed,
414448
}
415449

416-
if s.cfg.Fingerprint.Enabled {
450+
if s.cfg.Fingerprint.Growing {
451+
// Growing fingerprint mode: use raw bytes, no minimum size requirement
452+
s.log.Debugf("growing fingerprint mode enabled: max_length %d", s.cfg.Fingerprint.MaxLength)
453+
s.growingBuffer = make([]byte, s.cfg.Fingerprint.MaxLength)
454+
} else if s.cfg.Fingerprint.Enabled { // TODO(AndersonQ): it's confusing having cfg.Fingerprint.Enabled and cfg.Fingerprint.Growing meaning different things
417455
if s.cfg.Fingerprint.Length < sha256.BlockSize {
418456
err := fmt.Errorf("fingerprint size %d bytes cannot be smaller than %d bytes", config.Fingerprint.Length, sha256.BlockSize)
419457
return nil, fmt.Errorf("error while reading configuration of fingerprint: %w", err)
@@ -473,6 +511,7 @@ func (s *fileScanner) normalizeGlobPatterns() error {
473511

474512
// GetFiles returns a map of file descriptors by filenames that
475513
// match the configured paths.
514+
// AndersonQ: files are found here, duplicates are filtered out here
476515
func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor {
477516
fdByName := map[string]loginp.FileDescriptor{}
478517
// used to determine if a symlink resolves in a already known target
@@ -513,6 +552,8 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor {
513552
}
514553

515554
fileID := fd.FileID()
555+
// AndersonQ: was it a file that grew? It should not be relevant here
556+
// we're just getting the files we need to ingest.
516557
if knownFilename, exists := uniqueIDs[fileID]; exists {
517558
s.log.Warnf("%q points to an already known ingest target %q [%s==%s]. Skipping", fd.Filename, knownFilename, fileID, fileID)
518559
continue
@@ -609,6 +650,11 @@ func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescript
609650
var osFile *os.File
610651
var file File
611652

653+
// Growing fingerprint mode: compute raw bytes fingerprint
654+
if s.cfg.Fingerprint.Growing {
655+
return s.computeGrowingFingerprint(it, fd)
656+
}
657+
612658
if !s.cfg.Fingerprint.Enabled {
613659
return fd, nil
614660
}
@@ -684,6 +730,64 @@ func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescript
684730
return fd, nil
685731
}
686732

733+
// computeGrowingFingerprint computes a raw bytes fingerprint for the growing
734+
// fingerprint file identity. Unlike the hash-based fingerprint, this stores
735+
// the actual file content (hex-encoded) and can grow as the file grows.
736+
// Files of any size are supported - if the file is smaller than max_length,
737+
// the entire content is used as the fingerprint.
738+
func (s *fileScanner) computeGrowingFingerprint(it *ingestTarget, fd loginp.FileDescriptor) (loginp.FileDescriptor, error) {
739+
// Empty files have no fingerprint yet - empty string fingerprint
740+
if it.info.Size() == 0 {
741+
s.log.Info("fileScanner: computeGrowingFingerprint: size 0, nothing to compute", fd.Filename)
742+
return fd, nil
743+
}
744+
745+
osFile, err := os.Open(it.originalFilename)
746+
if err != nil {
747+
return fd, fmt.Errorf("failed to open %q for growing fingerprint: %w", it.originalFilename, err)
748+
}
749+
defer osFile.Close()
750+
751+
// Check for GZIP if allowed
752+
if s.gzipAllowed {
753+
fd.GZIP, err = IsGZIP(osFile)
754+
if err != nil {
755+
return fd, fmt.Errorf("failed to check if %q is gzip: %w", it.originalFilename, err)
756+
}
757+
}
758+
759+
// Determine how many bytes to read
760+
maxLength := s.cfg.Fingerprint.MaxLength
761+
fileSize := it.info.Size()
762+
// TODO(AndersonQ): If the file is GZIP-compressed, we cannot use the compressed size
763+
// it needs to read until maxLength or EOF
764+
toRead := fileSize
765+
if toRead > maxLength {
766+
toRead = maxLength
767+
}
768+
s.log.Infof("fileScanner: computeGrowingFingerprint: for file %s: %d/%d bytes",
769+
fd.Filename, toRead, maxLength)
770+
771+
var r io.Reader = osFile
772+
if fd.GZIP {
773+
// fingerprint is computed on decompressed data
774+
gzReader, err := newGzipSeekerReader(osFile, int(maxLength))
775+
if err != nil {
776+
return fd, fmt.Errorf("failed to create gzip reader for %q: %w", it.originalFilename, err)
777+
}
778+
defer gzReader.Close()
779+
r = gzReader
780+
}
781+
782+
n, err := io.ReadFull(r, s.growingBuffer[:toRead])
783+
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
784+
return fd, fmt.Errorf("failed to read %q for growing fingerprint: %w", it.originalFilename, err)
785+
}
786+
fd.Fingerprint = hex.EncodeToString(s.growingBuffer[:n])
787+
788+
return fd, nil
789+
}
790+
687791
func (s *fileScanner) isFileExcluded(file string) bool {
688792
return len(s.cfg.ExcludedFiles) > 0 && s.matchAny(s.cfg.ExcludedFiles, file)
689793
}

filebeat/input/filestream/identifier.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,22 @@ const (
3232
// IDs if a source is renamed.
3333
trackRename identifierFeature = iota
3434

35-
nativeName = "native"
36-
pathName = "path"
37-
inodeMarkerName = "inode_marker"
38-
fingerprintName = "fingerprint"
35+
nativeName = "native"
36+
pathName = "path"
37+
inodeMarkerName = "inode_marker"
38+
fingerprintName = "fingerprint"
39+
growingFingerprintName = "growing_fingerprint"
3940

4041
DefaultIdentifierName = nativeName
4142
identitySep = "::"
4243
)
4344

4445
var identifierFactories = map[string]identifierFactory{
45-
nativeName: newINodeDeviceIdentifier,
46-
pathName: newPathIdentifier,
47-
inodeMarkerName: newINodeMarkerIdentifier,
48-
fingerprintName: newFingerprintIdentifier,
46+
nativeName: newINodeDeviceIdentifier,
47+
pathName: newPathIdentifier,
48+
inodeMarkerName: newINodeMarkerIdentifier,
49+
fingerprintName: newFingerprintIdentifier,
50+
growingFingerprintName: newGrowingFingerprintIdentifier,
4951
}
5052

5153
type identifierFactory func(*conf.C, *logp.Logger) (fileIdentifier, error)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package filestream
19+
20+
import (
21+
loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
22+
conf "github.com/elastic/elastic-agent-libs/config"
23+
"github.com/elastic/elastic-agent-libs/logp"
24+
)
25+
26+
// growingFingerprintIdentifier identifies files by their raw content fingerprint.
27+
// Unlike the hash-based fingerprint identifier, this stores the actual bytes
28+
// (hex-encoded) of the file header and the fingerprint can grow as the file grows.
29+
// This allows tracking files of any size immediately, without waiting for them
30+
// to reach a minimum size threshold.
31+
type growingFingerprintIdentifier struct{}
32+
33+
func newGrowingFingerprintIdentifier(_ *conf.C, _ *logp.Logger) (fileIdentifier, error) {
34+
return &growingFingerprintIdentifier{}, nil
35+
}
36+
37+
func (i *growingFingerprintIdentifier) GetSource(e loginp.FSEvent) fileSource {
38+
return fileSource{
39+
desc: e.Descriptor,
40+
newPath: e.NewPath,
41+
oldPath: e.OldPath,
42+
truncated: e.Op == loginp.OpTruncate,
43+
archived: e.Op == loginp.OpArchived,
44+
fileID: growingFingerprintName + identitySep + e.Descriptor.Fingerprint,
45+
identifierGenerator: growingFingerprintName,
46+
}
47+
}
48+
49+
func (i *growingFingerprintIdentifier) Name() string {
50+
return growingFingerprintName
51+
}
52+
53+
func (i *growingFingerprintIdentifier) Supports(f identifierFeature) bool {
54+
switch f {
55+
case trackRename:
56+
return true
57+
default:
58+
return false
59+
}
60+
}
61+
62+

filebeat/input/filestream/internal/input-logfile/fswatch.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package input_logfile
1919

2020
import (
21+
"strings"
22+
23+
"github.com/elastic/elastic-agent-libs/logp"
2124
"github.com/elastic/go-concert/unison"
2225

2326
"github.com/elastic/beats/v7/libbeat/common/file"
@@ -64,7 +67,9 @@ type FileDescriptor struct {
6467
Filename string
6568
// Info is the result of file stat
6669
Info file.ExtendedFileInfo
67-
// Fingerprint is a computed hash of the file header
70+
// Fingerprint is used for file identity. For the "fingerprint" identity,
71+
// this is a hash of the file header. For "growing_fingerprint", this is
72+
// the hex-encoded raw bytes of the file header (which can grow).
6873
Fingerprint string
6974
// GZIP indicates if the file is compressed with GZIP.
7075
GZIP bool
@@ -101,11 +106,34 @@ func (fd FileDescriptor) FileID() string {
101106
}
102107

103108
// SameFile returns true if descriptors point to the same file.
104-
func SameFile(a, b *FileDescriptor) bool {
105-
return a.FileID() == b.FileID()
109+
// For growing fingerprint identity, this handles the case where a file's
110+
// fingerprint has grown - checks if one fingerprint is a prefix of the other
111+
// and verifies it's the same physical file via OS state (inode/device).
112+
func SameFile(log *logp.Logger, prev, current *FileDescriptor) bool {
113+
// return prev.FileID() == current.FileID()
114+
115+
// Fast path: exact match
116+
if prev.FileID() == current.FileID() {
117+
return true
118+
}
119+
120+
// For growing fingerprint: check if one fingerprint is a prefix of the other
121+
// This happens when a file grows and its fingerprint expands
122+
if prev.Fingerprint != "" && current.Fingerprint != "" {
123+
124+
// If shorter is a prefix of longer, verify it's the same physical file
125+
same := strings.HasPrefix(current.Fingerprint, prev.Fingerprint) &&
126+
prev.Filename == current.Filename
127+
128+
log.Infof("SameFile: %t: prev=%s, current=%s. prevPath: %s, currPath: %s",
129+
same, prev.Fingerprint, current.Fingerprint, prev.Filename, current.Filename)
130+
return same
131+
}
132+
133+
return false
106134
}
107135

108-
// FSEvent returns inforamation about file system changes.
136+
// FSEvent returns information about file system changes.
109137
type FSEvent struct {
110138
// NewPath is the new path of the file.
111139
NewPath string

0 commit comments

Comments
 (0)