Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 62 additions & 30 deletions libbeat/processors/script/javascript/javascript.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ import (
type jsProcessor struct {
Config
sessionPool *sessionPool
sourceProg *goja.Program
sourceFile string
stats *processorStats
logger *logp.Logger
}

// New constructs a new JavaScript processor.
Expand All @@ -59,50 +59,60 @@ func New(c *config.C, log *logp.Logger) (beat.Processor, error) {

// NewFromConfig constructs a new JavaScript processor from the given config
// object. It loads the sources, compiles them, and validates the entry point.
// For inline sources, initialization happens immediately. For file-based sources,
// initialization is deferred until SetPaths is called.
func NewFromConfig(c Config, reg *monitoring.Registry, logger *logp.Logger) (beat.Processor, error) {
err := c.Validate()
if err != nil {
return nil, err
}

var sourceFile string
var sourceCode string
switch {
case c.Source != "":
sourceFile = "inline.js"
sourceCode = c.Source
case c.File != "":
sourceFile, sourceCode, err = loadSources(c.File)
case len(c.Files) > 0:
sourceFile, sourceCode, err = loadSources(c.Files...)
processor := &jsProcessor{
Config: c,
logger: logger,
stats: getStats(c.Tag, reg, logger),
}
if err != nil {
return nil, annotateError(c.Tag, err)

// For inline sources, we can initialize immediately.
// For file-based sources, we defer initialization until SetPaths is called.
if c.Source != "" {
const inlineSourceFile = "inline.js"

err = processor.compile(inlineSourceFile, c.Source)
if err != nil {
return nil, err
}
}

// Validate processor source code.
prog, err := goja.Compile(sourceFile, sourceCode, true)
if err != nil {
return nil, err
return processor, nil
}

// SetPaths initializes the processor with the provided paths configuration.
// This method must be called before the processor can be used for file-based sources.
func (p *jsProcessor) SetPaths(path *paths.Path) error {
if p.Source != "" {
return nil // inline source already set
}

pool, err := newSessionPool(prog, c, logger)
var sourceFile string
var sourceCode string
var err error

switch {
case p.File != "":
sourceFile, sourceCode, err = loadSources(path, p.File)
case len(p.Files) > 0:
sourceFile, sourceCode, err = loadSources(path, p.Files...)
}
if err != nil {
return nil, annotateError(c.Tag, err)
return annotateError(p.Tag, err)
}

return &jsProcessor{
Config: c,
sessionPool: pool,
sourceProg: prog,
sourceFile: sourceFile,
stats: getStats(c.Tag, reg, logger),
}, nil
return p.compile(sourceFile, sourceCode)
}

// loadSources loads javascript source from files.
func loadSources(files ...string) (string, string, error) {
var sources []string
// loadSources loads javascript source from files using the provided paths.
func loadSources(pathConfig *paths.Path, files ...string) (string, string, error) {
buf := new(bytes.Buffer)

readFile := func(path string) error {
Expand All @@ -124,8 +134,9 @@ func loadSources(files ...string) (string, string, error) {
return nil
}

sources := make([]string, 0, len(files))
for _, filePath := range files {
filePath = paths.Resolve(paths.Config, filePath)
filePath = pathConfig.Resolve(paths.Config, filePath)

if hasMeta(filePath) {
matches, err := filepath.Glob(filePath)
Expand Down Expand Up @@ -162,9 +173,30 @@ func annotateError(id string, err error) error {
return fmt.Errorf("failed in processor.javascript: %w", err)
}

func (p *jsProcessor) compile(sourceFile, sourceCode string) error {
// Validate processor source code.
prog, err := goja.Compile(sourceFile, sourceCode, true)
if err != nil {
return err
}

pool, err := newSessionPool(prog, p.Config, p.logger)
if err != nil {
return annotateError(p.Tag, err)
}

p.sessionPool = pool
p.sourceFile = sourceFile
return nil
}

// Run executes the processor on the given it event. It invokes the
// process function defined in the JavaScript source.
func (p *jsProcessor) Run(event *beat.Event) (*beat.Event, error) {
if p.sessionPool == nil {
return event, fmt.Errorf("javascript processor not initialized: SetPaths must be called for file-based sources")
}

s := p.sessionPool.Get()
defer p.sessionPool.Put(s)

Expand Down
114 changes: 104 additions & 10 deletions libbeat/processors/script/javascript/javascript_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/paths"
)

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -69,19 +70,34 @@ func TestNew(t *testing.T) {
require.ErrorContains(t, err, "process function not found")
})

t.Run("file not found", func(t *testing.T) {
cfg, err := config.NewConfigFrom(map[string]any{"file": filepath.Join(tmpDir, "nonexistent.js")})
t.Run("SetPaths file not found", func(t *testing.T) {
cfg, err := config.NewConfigFrom(map[string]any{"file": "nonexistent.js"})
require.NoError(t, err)

_, err = New(cfg, logptest.NewTestingLogger(t, ""))
p, err := New(cfg, logptest.NewTestingLogger(t, ""))
require.NoError(t, err) // Construction succeeds

jsProc, ok := p.(*jsProcessor)
require.True(t, ok)

// SetPaths should fail
err = jsProc.SetPaths(tmpPaths(tmpDir))
require.ErrorContains(t, err, "no such file or directory")
})

t.Run("no sources found with glob", func(t *testing.T) {
cfg, err := config.NewConfigFrom(map[string]any{"file": filepath.Join(tmpDir, "nomatch", "*.js")})
t.Run("SetPaths no sources found with glob", func(t *testing.T) {
emptyDir := t.TempDir()
cfg, err := config.NewConfigFrom(map[string]any{"file": "nomatch/*.js"})
require.NoError(t, err)

_, err = New(cfg, logptest.NewTestingLogger(t, ""))
p, err := New(cfg, logptest.NewTestingLogger(t, ""))
require.NoError(t, err) // Construction succeeds

jsProc, ok := p.(*jsProcessor)
require.True(t, ok)

// SetPaths should fail
err = jsProc.SetPaths(tmpPaths(emptyDir))
require.ErrorContains(t, err, "no sources were found")
})
}
Expand All @@ -102,9 +118,17 @@ func TestRun(t *testing.T) {

t.Run("with file", func(t *testing.T) {
file := writeFile(t, tmpDir, "processor.js", `function process(event) { event.Put("from_file", true); }`)
p := newTestProcessor(t, "file", file, "")
p := newTestProcessor(t, "file", filepath.Base(file), "")

evt := &beat.Event{Fields: mapstr.M{}}
// Try to use without SetPaths - should fail
evt, err := p.Run(newTestEvent())
assert.NotNil(t, evt)
assert.ErrorContains(t, err, "javascript processor not initialized")
assert.ErrorContains(t, err, "SetPaths must be called")

setPaths(t, p, tmpDir)

evt = &beat.Event{Fields: mapstr.M{}}
result, err := p.Run(evt)
require.NoError(t, err)

Expand All @@ -115,7 +139,9 @@ func TestRun(t *testing.T) {
t.Run("with multiple files", func(t *testing.T) {
utilFile := writeFile(t, tmpDir, "util.js", "var multiplier = 2;")
mainFile := writeFile(t, tmpDir, "main.js", `function process(event) { event.Put("multiplier", multiplier); }`)
p := newTestProcessor(t, "files", []string{utilFile, mainFile}, "")

p := newTestProcessor(t, "files", []string{filepath.Base(utilFile), filepath.Base(mainFile)}, "")
setPaths(t, p, tmpDir)

evt := &beat.Event{Fields: mapstr.M{}}
result, err := p.Run(evt)
Expand All @@ -129,7 +155,9 @@ func TestRun(t *testing.T) {
globDir := t.TempDir()
writeFile(t, globDir, "a_utils.js", "var fromGlob = true;")
writeFile(t, globDir, "b_main.js", `function process(event) { event.Put("from_glob", fromGlob); }`)
p := newTestProcessor(t, "file", filepath.Join(globDir, "*.js"), "")

p := newTestProcessor(t, "file", "*.js", "")
setPaths(t, p, globDir)

evt := &beat.Event{Fields: mapstr.M{}}
result, err := p.Run(evt)
Expand All @@ -139,6 +167,17 @@ func TestRun(t *testing.T) {
v, _ := result.GetValue("from_glob")
assert.Equal(t, true, v)
})

t.Run("after SetPaths on inline source", func(t *testing.T) {
p := newTestProcessor(t, "source", `function process(event) { event.Put("x", 1); return event; }`, "")
setPaths(t, p, "/does/not/matter")

// Should still work
evt, err := p.Run(newTestEvent())
require.NoError(t, err)
v, _ := evt.GetValue("x")
assert.Equal(t, int64(1), v)
})
}

func TestRunWithStats(t *testing.T) {
Expand Down Expand Up @@ -181,6 +220,35 @@ func TestRunWithStats(t *testing.T) {
})
}

func TestSetPathsWithRelativePath(t *testing.T) {
tmpDir := t.TempDir()
scriptsDir := filepath.Join(tmpDir, "scripts")
err := os.MkdirAll(scriptsDir, 0755)
require.NoError(t, err)

writeFile(t, scriptsDir, "test.js", `function process(event) { event.Put("added", "value"); return event; }`)

p, err := NewFromConfig(Config{
File: "scripts/test.js",
}, nil, logptest.NewTestingLogger(t, ""))
require.NoError(t, err)

jsProc, ok := p.(*jsProcessor)
require.True(t, ok, "processor should be *jsProcessor")

// Initialize with paths where Config points to configDir
err = jsProc.SetPaths(tmpPaths(tmpDir))
require.NoError(t, err)

// Should resolve relative to the config/ directory
evt, err := jsProc.Run(newTestEvent())
require.NoError(t, err)

val, err := evt.GetValue("added")
require.NoError(t, err)
assert.Equal(t, "value", val)
}

func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Processor {
t.Helper()
cfg := map[string]any{key: value}
Expand All @@ -194,10 +262,36 @@ func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Proc
return p
}

func setPaths(t *testing.T, p beat.Processor, tmpDir string) {
t.Helper()
require.IsType(t, &jsProcessor{}, p)
jsProc, ok := p.(*jsProcessor)
require.True(t, ok, "expected *jsProcessor type")
err := jsProc.SetPaths(tmpPaths(tmpDir))
require.NoError(t, err)
}

func writeFile(t *testing.T, dir, name, contents string) string {
t.Helper()
path := filepath.Join(dir, name)
err := os.WriteFile(path, []byte(contents), 0o644)
require.NoErrorf(t, err, "failed to write to file %s", path)
return path
}

func newTestEvent() *beat.Event {
return &beat.Event{
Fields: mapstr.M{
"message": "test event",
},
}
}

func tmpPaths(dir string) *paths.Path {
return &paths.Path{
Home: dir,
Config: dir,
Data: dir,
Logs: dir,
}
}
32 changes: 30 additions & 2 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
}

// multiReceiverConfig creates a Config for testing multiple receivers.
// Each receiver gets a unique home path.
// Each receiver gets a unique home path and a JavaScript processor that loads from its own path.config directory.
func multiReceiverConfig(helper multiReceiverHelper) *Config {
return &Config{
Beatconfig: map[string]any{
Expand All @@ -176,6 +176,17 @@ func multiReceiverConfig(helper multiReceiverHelper) *Config {
"enabled": true,
"message": "test",
"count": 1,
// Each receiver gets a JavaScript processor that loads from its own
// path.config directory, adding a unique marker field to verify isolation.
"processors": []map[string]any{
{
"script": map[string]any{
"lang": "javascript",
"file": "processor.js",
"tag": "js-" + helper.jsMarker,
},
},
},
},
{
"type": "filestream",
Expand Down Expand Up @@ -206,14 +217,27 @@ type multiReceiverHelper struct {
name string
home string
ingest string
jsMarker string
monitorSocket string
}

func newMultiReceiverHelper(t *testing.T, number int) multiReceiverHelper {
const (
scriptFormat = `function process(event) { event.Put("js_marker", %q); return event; }`
)

home := t.TempDir()

// Create JavaScript processor files in each receiver's home directory.
// Each script adds a unique marker field to verify path isolation.
jsMarker := fmt.Sprintf("receiver%d", number)
writeFile(t, filepath.Join(home, "processor.js"), fmt.Sprintf(scriptFormat, jsMarker))

return multiReceiverHelper{
name: fmt.Sprintf("r%d", number),
home: t.TempDir(),
home: home,
ingest: filepath.Join(t.TempDir(), fmt.Sprintf("test%d.log", number)),
jsMarker: jsMarker,
monitorSocket: genSocketPath(t),
}
}
Expand Down Expand Up @@ -252,6 +276,10 @@ func TestMultipleReceivers(t *testing.T) {

assert.Equalf(c, "test", logs[helper.name][0].Flatten()["message"], "expected %v message field to be 'test'", helper)

// Verify that each receiver used its own JavaScript processor script.
// This demonstrates path isolation: each receiver loads processor.js from its own path.config.
assert.Equalf(c, helper.jsMarker, logs[helper.name][0].Flatten()["js_marker"], "expected %v to have js_marker from its own script", helper)

// Make sure that each receiver has a separate logger
// instance and does not interfere with others. Previously, the
// logger in Beats was global, causing logger fields to be
Expand Down
Loading