Skip to content
92 changes: 62 additions & 30 deletions libbeat/processors/script/javascript/javascript.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ import (
type jsProcessor struct {
Config
sessionPool *sessionPool
sourceProg *goja.Program
sourceFile string
stats *processorStats
logger *logp.Logger
}

// New constructs a new JavaScript processor.
Expand All @@ -59,50 +59,60 @@ func New(c *config.C, log *logp.Logger) (beat.Processor, error) {

// NewFromConfig constructs a new JavaScript processor from the given config
// object. It loads the sources, compiles them, and validates the entry point.
// For inline sources, initialization happens immediately. For file-based sources,
// initialization is deferred until SetPaths is called.
func NewFromConfig(c Config, reg *monitoring.Registry, logger *logp.Logger) (beat.Processor, error) {
err := c.Validate()
if err != nil {
return nil, err
}

var sourceFile string
var sourceCode string
switch {
case c.Source != "":
sourceFile = "inline.js"
sourceCode = c.Source
case c.File != "":
sourceFile, sourceCode, err = loadSources(c.File)
case len(c.Files) > 0:
sourceFile, sourceCode, err = loadSources(c.Files...)
processor := &jsProcessor{
Config: c,
logger: logger,
stats: getStats(c.Tag, reg, logger),
}
if err != nil {
return nil, annotateError(c.Tag, err)

// For inline sources, we can initialize immediately.
// For file-based sources, we defer initialization until SetPaths is called.
if c.Source != "" {
const inlineSourceFile = "inline.js"

err = processor.compile(inlineSourceFile, c.Source)
if err != nil {
return nil, err
}
}

// Validate processor source code.
prog, err := goja.Compile(sourceFile, sourceCode, true)
if err != nil {
return nil, err
return processor, nil
}

// SetPaths initializes the processor with the provided paths configuration.
// This method must be called before the processor can be used for file-based sources.
func (p *jsProcessor) SetPaths(path *paths.Path) error {
if p.Source != "" {
return nil // inline source already set
}

pool, err := newSessionPool(prog, c, logger)
var sourceFile string
var sourceCode string
var err error

switch {
case p.File != "":
sourceFile, sourceCode, err = loadSources(path, p.File)
case len(p.Files) > 0:
sourceFile, sourceCode, err = loadSources(path, p.Files...)
}
if err != nil {
return nil, annotateError(c.Tag, err)
return annotateError(p.Tag, err)
}

return &jsProcessor{
Config: c,
sessionPool: pool,
sourceProg: prog,
sourceFile: sourceFile,
stats: getStats(c.Tag, reg, logger),
}, nil
return p.compile(sourceFile, sourceCode)
}

// loadSources loads javascript source from files.
func loadSources(files ...string) (string, string, error) {
var sources []string
// loadSources loads javascript source from files using the provided paths.
func loadSources(pathConfig *paths.Path, files ...string) (string, string, error) {
buf := new(bytes.Buffer)

readFile := func(path string) error {
Expand All @@ -124,8 +134,9 @@ func loadSources(files ...string) (string, string, error) {
return nil
}

sources := make([]string, 0, len(files))
for _, filePath := range files {
filePath = paths.Resolve(paths.Config, filePath)
filePath = pathConfig.Resolve(paths.Config, filePath)

if hasMeta(filePath) {
matches, err := filepath.Glob(filePath)
Expand Down Expand Up @@ -162,9 +173,30 @@ func annotateError(id string, err error) error {
return fmt.Errorf("failed in processor.javascript: %w", err)
}

func (p *jsProcessor) compile(sourceFile, sourceCode string) error {
// Validate processor source code.
prog, err := goja.Compile(sourceFile, sourceCode, true)
if err != nil {
return err
}

pool, err := newSessionPool(prog, p.Config, p.logger)
if err != nil {
return annotateError(p.Tag, err)
}

p.sessionPool = pool
p.sourceFile = sourceFile
return nil
}

// Run executes the processor on the given it event. It invokes the
// process function defined in the JavaScript source.
func (p *jsProcessor) Run(event *beat.Event) (*beat.Event, error) {
if p.sessionPool == nil {
return event, fmt.Errorf("javascript processor not initialized: SetPaths must be called for file-based sources")
}

s := p.sessionPool.Get()
defer p.sessionPool.Put(s)

Expand Down
114 changes: 104 additions & 10 deletions libbeat/processors/script/javascript/javascript_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/paths"
)

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -69,19 +70,34 @@ func TestNew(t *testing.T) {
require.ErrorContains(t, err, "process function not found")
})

t.Run("file not found", func(t *testing.T) {
cfg, err := config.NewConfigFrom(map[string]any{"file": filepath.Join(tmpDir, "nonexistent.js")})
t.Run("SetPaths file not found", func(t *testing.T) {
cfg, err := config.NewConfigFrom(map[string]any{"file": "nonexistent.js"})
require.NoError(t, err)

_, err = New(cfg, logptest.NewTestingLogger(t, ""))
p, err := New(cfg, logptest.NewTestingLogger(t, ""))
require.NoError(t, err) // Construction succeeds

jsProc, ok := p.(*jsProcessor)
require.True(t, ok)

// SetPaths should fail
err = jsProc.SetPaths(tmpPaths(tmpDir))
require.ErrorContains(t, err, "no such file or directory")
})

t.Run("no sources found with glob", func(t *testing.T) {
cfg, err := config.NewConfigFrom(map[string]any{"file": filepath.Join(tmpDir, "nomatch", "*.js")})
t.Run("SetPaths no sources found with glob", func(t *testing.T) {
emptyDir := t.TempDir()
cfg, err := config.NewConfigFrom(map[string]any{"file": "nomatch/*.js"})
require.NoError(t, err)

_, err = New(cfg, logptest.NewTestingLogger(t, ""))
p, err := New(cfg, logptest.NewTestingLogger(t, ""))
require.NoError(t, err) // Construction succeeds

jsProc, ok := p.(*jsProcessor)
require.True(t, ok)

// SetPaths should fail
err = jsProc.SetPaths(tmpPaths(emptyDir))
require.ErrorContains(t, err, "no sources were found")
})
}
Expand All @@ -102,9 +118,17 @@ func TestRun(t *testing.T) {

t.Run("with file", func(t *testing.T) {
file := writeFile(t, tmpDir, "processor.js", `function process(event) { event.Put("from_file", true); }`)
p := newTestProcessor(t, "file", file, "")
p := newTestProcessor(t, "file", filepath.Base(file), "")

evt := &beat.Event{Fields: mapstr.M{}}
// Try to use without SetPaths - should fail
evt, err := p.Run(newTestEvent())
assert.NotNil(t, evt)
assert.ErrorContains(t, err, "javascript processor not initialized")
assert.ErrorContains(t, err, "SetPaths must be called")

setPaths(t, p, tmpDir)

evt = &beat.Event{Fields: mapstr.M{}}
result, err := p.Run(evt)
require.NoError(t, err)

Expand All @@ -115,7 +139,9 @@ func TestRun(t *testing.T) {
t.Run("with multiple files", func(t *testing.T) {
utilFile := writeFile(t, tmpDir, "util.js", "var multiplier = 2;")
mainFile := writeFile(t, tmpDir, "main.js", `function process(event) { event.Put("multiplier", multiplier); }`)
p := newTestProcessor(t, "files", []string{utilFile, mainFile}, "")

p := newTestProcessor(t, "files", []string{filepath.Base(utilFile), filepath.Base(mainFile)}, "")
setPaths(t, p, tmpDir)

evt := &beat.Event{Fields: mapstr.M{}}
result, err := p.Run(evt)
Expand All @@ -129,7 +155,9 @@ func TestRun(t *testing.T) {
globDir := t.TempDir()
writeFile(t, globDir, "a_utils.js", "var fromGlob = true;")
writeFile(t, globDir, "b_main.js", `function process(event) { event.Put("from_glob", fromGlob); }`)
p := newTestProcessor(t, "file", filepath.Join(globDir, "*.js"), "")

p := newTestProcessor(t, "file", "*.js", "")
setPaths(t, p, globDir)

evt := &beat.Event{Fields: mapstr.M{}}
result, err := p.Run(evt)
Expand All @@ -139,6 +167,17 @@ func TestRun(t *testing.T) {
v, _ := result.GetValue("from_glob")
assert.Equal(t, true, v)
})

t.Run("after SetPaths on inline source", func(t *testing.T) {
p := newTestProcessor(t, "source", `function process(event) { event.Put("x", 1); return event; }`, "")
setPaths(t, p, "/does/not/matter")

// Should still work
evt, err := p.Run(newTestEvent())
require.NoError(t, err)
v, _ := evt.GetValue("x")
assert.Equal(t, int64(1), v)
})
}

func TestRunWithStats(t *testing.T) {
Expand Down Expand Up @@ -181,6 +220,35 @@ func TestRunWithStats(t *testing.T) {
})
}

func TestSetPathsWithRelativePath(t *testing.T) {
tmpDir := t.TempDir()
scriptsDir := filepath.Join(tmpDir, "scripts")
err := os.MkdirAll(scriptsDir, 0755)
require.NoError(t, err)

writeFile(t, scriptsDir, "test.js", `function process(event) { event.Put("added", "value"); return event; }`)

p, err := NewFromConfig(Config{
File: "scripts/test.js",
}, nil, logptest.NewTestingLogger(t, ""))
require.NoError(t, err)

jsProc, ok := p.(*jsProcessor)
require.True(t, ok, "processor should be *jsProcessor")

// Initialize with paths where Config points to configDir
err = jsProc.SetPaths(tmpPaths(tmpDir))
require.NoError(t, err)

// Should resolve relative to the config/ directory
evt, err := jsProc.Run(newTestEvent())
require.NoError(t, err)

val, err := evt.GetValue("added")
require.NoError(t, err)
assert.Equal(t, "value", val)
}

func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Processor {
t.Helper()
cfg := map[string]any{key: value}
Expand All @@ -194,10 +262,36 @@ func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Proc
return p
}

func setPaths(t *testing.T, p beat.Processor, tmpDir string) {
t.Helper()
require.IsType(t, &jsProcessor{}, p)
jsProc, ok := p.(*jsProcessor)
require.True(t, ok, "expected *jsProcessor type")
err := jsProc.SetPaths(tmpPaths(tmpDir))
require.NoError(t, err)
}

func writeFile(t *testing.T, dir, name, contents string) string {
t.Helper()
path := filepath.Join(dir, name)
err := os.WriteFile(path, []byte(contents), 0o644)
require.NoErrorf(t, err, "failed to write to file %s", path)
return path
}

func newTestEvent() *beat.Event {
return &beat.Event{
Fields: mapstr.M{
"message": "test event",
},
}
}

func tmpPaths(dir string) *paths.Path {
return &paths.Path{
Home: dir,
Config: dir,
Data: dir,
Logs: dir,
}
}
32 changes: 30 additions & 2 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
}

// multiReceiverConfig creates a Config for testing multiple receivers.
// Each receiver gets a unique home path.
// Each receiver gets a unique home path and a JavaScript processor that loads from its own path.config directory.
func multiReceiverConfig(helper multiReceiverHelper) *Config {
return &Config{
Beatconfig: map[string]any{
Expand All @@ -170,6 +170,17 @@ func multiReceiverConfig(helper multiReceiverHelper) *Config {
"enabled": true,
"message": "test",
"count": 1,
// Each receiver gets a JavaScript processor that loads from its own
// path.config directory, adding a unique marker field to verify isolation.
"processors": []map[string]any{
{
"script": map[string]any{
"lang": "javascript",
"file": "processor.js",
"tag": "js-" + helper.jsMarker,
},
},
},
},
{
"type": "filestream",
Expand Down Expand Up @@ -197,14 +208,27 @@ type multiReceiverHelper struct {
name string
home string
ingest string
jsMarker string
monitorSocket string
}

func newMultiReceiverHelper(t *testing.T, number int) multiReceiverHelper {
const (
scriptFormat = `function process(event) { event.Put("js_marker", %q); return event; }`
)

home := t.TempDir()

// Create JavaScript processor files in each receiver's home directory.
// Each script adds a unique marker field to verify path isolation.
jsMarker := fmt.Sprintf("receiver%d", number)
writeFile(t, filepath.Join(home, "processor.js"), fmt.Sprintf(scriptFormat, jsMarker))

return multiReceiverHelper{
name: fmt.Sprintf("r%d", number),
home: t.TempDir(),
home: home,
ingest: filepath.Join(t.TempDir(), fmt.Sprintf("test%d.log", number)),
jsMarker: jsMarker,
monitorSocket: genSocketPath(t),
}
}
Expand Down Expand Up @@ -243,6 +267,10 @@ func TestMultipleReceivers(t *testing.T) {

assert.Equalf(c, "test", logs[helper.name][0].Flatten()["message"], "expected %v message field to be 'test'", helper)

// Verify that each receiver used its own JavaScript processor script.
// This demonstrates path isolation: each receiver loads processor.js from its own path.config.
assert.Equalf(c, helper.jsMarker, logs[helper.name][0].Flatten()["js_marker"], "expected %v to have js_marker from its own script", helper)

// Make sure that each receiver has a separate logger
// instance and does not interfere with others. Previously, the
// logger in Beats was global, causing logger fields to be
Expand Down
Loading