Skip to content

Commit e4a3930

Browse files
orestisflmergify[bot]
authored andcommitted
javascript processor: replace global paths with per-beat paths (#47870)
## Proposed commit message Changes: - Add SetPaths(path *paths.Path) method to jsProcessor that accepts a per-beat paths configuration - Defer file-based source initialization until SetPaths is called - For inline sources, initialization still happens immediately in NewFromConfig - loadSources now takes a *paths.Path parameter and uses pathConfig.Resolve() instead of the global paths.Resolve() Fixes #46988 ## How to test this PR locally Run filebeat with: ```yaml path.config: myconfig/ filebeat.inputs: - type: filestream id: input-a paths: - /tmp/logs/a.log processors: - script: lang: javascript file: test_processor.js tag: test-js-processor output.console: enabled: true ``` `myconfig/test_processor.js`: ```js function process(event) { event.Put("js_processor.processed", true); event.Put("js_processor.timestamp", new Date().toISOString()); var msg = event.Get("message"); if (msg) { event.Put("message_upper", msg.toUpperCase()); } return event; } ``` write some logs: ```sh yes 'some log' | head -n 10000 > /tmp/logs/a.log ``` See output: ```json { "@timestamp": "2025-12-02T18:09:36.488Z", "@metadata": { "beat": "filebeat", "type": "_doc", "version": "9.3.0" }, "ecs": { "version": "8.0.0" }, "log": { "offset": 89586, "file": { "path": "/tmp/logs/a.log", "device_id": "38", "inode": "73159", "fingerprint": "83016ba24a8d31ccb16d2230eabcb1f043fa4c65914339eb954619b5c13fd55a" } }, "message": "some log", "input": { "type": "filestream" }, "js_processor": { "processed": true, "timestamp": "2025-12-02T18:09:36.488Z" }, "message_upper": "SOME LOG", "host": { "name": "laptop" }, "agent": { "version": "9.3.0", "ephemeral_id": "f6bb5c64-d2d1-4dad-a0e1-bbc553b617ec", "id": "40aef7d0-efcb-4613-a1df-d9bc42ff36b9", "name": "laptop", "type": "filebeat" } } ``` ## Related issues - Closes #46988 - Relates #47353 (cherry picked from commit 2179473)
1 parent 6e0b5e0 commit e4a3930

File tree

4 files changed

+219
-42
lines changed

4 files changed

+219
-42
lines changed

libbeat/processors/script/javascript/javascript.go

Lines changed: 62 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ import (
4242
type jsProcessor struct {
4343
Config
4444
sessionPool *sessionPool
45-
sourceProg *goja.Program
4645
sourceFile string
4746
stats *processorStats
47+
logger *logp.Logger
4848
}
4949

5050
// New constructs a new JavaScript processor.
@@ -59,50 +59,60 @@ func New(c *config.C, log *logp.Logger) (beat.Processor, error) {
5959

6060
// NewFromConfig constructs a new JavaScript processor from the given config
6161
// object. It loads the sources, compiles them, and validates the entry point.
62+
// For inline sources, initialization happens immediately. For file-based sources,
63+
// initialization is deferred until SetPaths is called.
6264
func NewFromConfig(c Config, reg *monitoring.Registry, logger *logp.Logger) (beat.Processor, error) {
6365
err := c.Validate()
6466
if err != nil {
6567
return nil, err
6668
}
6769

68-
var sourceFile string
69-
var sourceCode string
70-
switch {
71-
case c.Source != "":
72-
sourceFile = "inline.js"
73-
sourceCode = c.Source
74-
case c.File != "":
75-
sourceFile, sourceCode, err = loadSources(c.File)
76-
case len(c.Files) > 0:
77-
sourceFile, sourceCode, err = loadSources(c.Files...)
70+
processor := &jsProcessor{
71+
Config: c,
72+
logger: logger,
73+
stats: getStats(c.Tag, reg, logger),
7874
}
79-
if err != nil {
80-
return nil, annotateError(c.Tag, err)
75+
76+
// For inline sources, we can initialize immediately.
77+
// For file-based sources, we defer initialization until SetPaths is called.
78+
if c.Source != "" {
79+
const inlineSourceFile = "inline.js"
80+
81+
err = processor.compile(inlineSourceFile, c.Source)
82+
if err != nil {
83+
return nil, err
84+
}
8185
}
8286

83-
// Validate processor source code.
84-
prog, err := goja.Compile(sourceFile, sourceCode, true)
85-
if err != nil {
86-
return nil, err
87+
return processor, nil
88+
}
89+
90+
// SetPaths initializes the processor with the provided paths configuration.
91+
// This method must be called before the processor can be used for file-based sources.
92+
func (p *jsProcessor) SetPaths(path *paths.Path) error {
93+
if p.Source != "" {
94+
return nil // inline source already set
8795
}
8896

89-
pool, err := newSessionPool(prog, c, logger)
97+
var sourceFile string
98+
var sourceCode string
99+
var err error
100+
101+
switch {
102+
case p.File != "":
103+
sourceFile, sourceCode, err = loadSources(path, p.File)
104+
case len(p.Files) > 0:
105+
sourceFile, sourceCode, err = loadSources(path, p.Files...)
106+
}
90107
if err != nil {
91-
return nil, annotateError(c.Tag, err)
108+
return annotateError(p.Tag, err)
92109
}
93110

94-
return &jsProcessor{
95-
Config: c,
96-
sessionPool: pool,
97-
sourceProg: prog,
98-
sourceFile: sourceFile,
99-
stats: getStats(c.Tag, reg, logger),
100-
}, nil
111+
return p.compile(sourceFile, sourceCode)
101112
}
102113

103-
// loadSources loads javascript source from files.
104-
func loadSources(files ...string) (string, string, error) {
105-
var sources []string
114+
// loadSources loads javascript source from files using the provided paths.
115+
func loadSources(pathConfig *paths.Path, files ...string) (string, string, error) {
106116
buf := new(bytes.Buffer)
107117

108118
readFile := func(path string) error {
@@ -124,8 +134,9 @@ func loadSources(files ...string) (string, string, error) {
124134
return nil
125135
}
126136

137+
sources := make([]string, 0, len(files))
127138
for _, filePath := range files {
128-
filePath = paths.Resolve(paths.Config, filePath)
139+
filePath = pathConfig.Resolve(paths.Config, filePath)
129140

130141
if hasMeta(filePath) {
131142
matches, err := filepath.Glob(filePath)
@@ -162,9 +173,30 @@ func annotateError(id string, err error) error {
162173
return fmt.Errorf("failed in processor.javascript: %w", err)
163174
}
164175

176+
func (p *jsProcessor) compile(sourceFile, sourceCode string) error {
177+
// Validate processor source code.
178+
prog, err := goja.Compile(sourceFile, sourceCode, true)
179+
if err != nil {
180+
return err
181+
}
182+
183+
pool, err := newSessionPool(prog, p.Config, p.logger)
184+
if err != nil {
185+
return annotateError(p.Tag, err)
186+
}
187+
188+
p.sessionPool = pool
189+
p.sourceFile = sourceFile
190+
return nil
191+
}
192+
165193
// Run executes the processor on the given it event. It invokes the
166194
// process function defined in the JavaScript source.
167195
func (p *jsProcessor) Run(event *beat.Event) (*beat.Event, error) {
196+
if p.sessionPool == nil {
197+
return event, fmt.Errorf("javascript processor not initialized: SetPaths must be called for file-based sources")
198+
}
199+
168200
s := p.sessionPool.Get()
169201
defer p.sessionPool.Put(s)
170202

libbeat/processors/script/javascript/javascript_test.go

Lines changed: 104 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/elastic/elastic-agent-libs/logp/logptest"
3232
"github.com/elastic/elastic-agent-libs/mapstr"
3333
"github.com/elastic/elastic-agent-libs/monitoring"
34+
"github.com/elastic/elastic-agent-libs/paths"
3435
)
3536

3637
func TestNew(t *testing.T) {
@@ -69,19 +70,34 @@ func TestNew(t *testing.T) {
6970
require.ErrorContains(t, err, "process function not found")
7071
})
7172

72-
t.Run("file not found", func(t *testing.T) {
73-
cfg, err := config.NewConfigFrom(map[string]any{"file": filepath.Join(tmpDir, "nonexistent.js")})
73+
t.Run("SetPaths file not found", func(t *testing.T) {
74+
cfg, err := config.NewConfigFrom(map[string]any{"file": "nonexistent.js"})
7475
require.NoError(t, err)
7576

76-
_, err = New(cfg, logptest.NewTestingLogger(t, ""))
77+
p, err := New(cfg, logptest.NewTestingLogger(t, ""))
78+
require.NoError(t, err) // Construction succeeds
79+
80+
jsProc, ok := p.(*jsProcessor)
81+
require.True(t, ok)
82+
83+
// SetPaths should fail
84+
err = jsProc.SetPaths(tmpPaths(tmpDir))
7785
require.ErrorContains(t, err, "no such file or directory")
7886
})
7987

80-
t.Run("no sources found with glob", func(t *testing.T) {
81-
cfg, err := config.NewConfigFrom(map[string]any{"file": filepath.Join(tmpDir, "nomatch", "*.js")})
88+
t.Run("SetPaths no sources found with glob", func(t *testing.T) {
89+
emptyDir := t.TempDir()
90+
cfg, err := config.NewConfigFrom(map[string]any{"file": "nomatch/*.js"})
8291
require.NoError(t, err)
8392

84-
_, err = New(cfg, logptest.NewTestingLogger(t, ""))
93+
p, err := New(cfg, logptest.NewTestingLogger(t, ""))
94+
require.NoError(t, err) // Construction succeeds
95+
96+
jsProc, ok := p.(*jsProcessor)
97+
require.True(t, ok)
98+
99+
// SetPaths should fail
100+
err = jsProc.SetPaths(tmpPaths(emptyDir))
85101
require.ErrorContains(t, err, "no sources were found")
86102
})
87103
}
@@ -102,9 +118,17 @@ func TestRun(t *testing.T) {
102118

103119
t.Run("with file", func(t *testing.T) {
104120
file := writeFile(t, tmpDir, "processor.js", `function process(event) { event.Put("from_file", true); }`)
105-
p := newTestProcessor(t, "file", file, "")
121+
p := newTestProcessor(t, "file", filepath.Base(file), "")
106122

107-
evt := &beat.Event{Fields: mapstr.M{}}
123+
// Try to use without SetPaths - should fail
124+
evt, err := p.Run(newTestEvent())
125+
assert.NotNil(t, evt)
126+
assert.ErrorContains(t, err, "javascript processor not initialized")
127+
assert.ErrorContains(t, err, "SetPaths must be called")
128+
129+
setPaths(t, p, tmpDir)
130+
131+
evt = &beat.Event{Fields: mapstr.M{}}
108132
result, err := p.Run(evt)
109133
require.NoError(t, err)
110134

@@ -115,7 +139,9 @@ func TestRun(t *testing.T) {
115139
t.Run("with multiple files", func(t *testing.T) {
116140
utilFile := writeFile(t, tmpDir, "util.js", "var multiplier = 2;")
117141
mainFile := writeFile(t, tmpDir, "main.js", `function process(event) { event.Put("multiplier", multiplier); }`)
118-
p := newTestProcessor(t, "files", []string{utilFile, mainFile}, "")
142+
143+
p := newTestProcessor(t, "files", []string{filepath.Base(utilFile), filepath.Base(mainFile)}, "")
144+
setPaths(t, p, tmpDir)
119145

120146
evt := &beat.Event{Fields: mapstr.M{}}
121147
result, err := p.Run(evt)
@@ -129,7 +155,9 @@ func TestRun(t *testing.T) {
129155
globDir := t.TempDir()
130156
writeFile(t, globDir, "a_utils.js", "var fromGlob = true;")
131157
writeFile(t, globDir, "b_main.js", `function process(event) { event.Put("from_glob", fromGlob); }`)
132-
p := newTestProcessor(t, "file", filepath.Join(globDir, "*.js"), "")
158+
159+
p := newTestProcessor(t, "file", "*.js", "")
160+
setPaths(t, p, globDir)
133161

134162
evt := &beat.Event{Fields: mapstr.M{}}
135163
result, err := p.Run(evt)
@@ -139,6 +167,17 @@ func TestRun(t *testing.T) {
139167
v, _ := result.GetValue("from_glob")
140168
assert.Equal(t, true, v)
141169
})
170+
171+
t.Run("after SetPaths on inline source", func(t *testing.T) {
172+
p := newTestProcessor(t, "source", `function process(event) { event.Put("x", 1); return event; }`, "")
173+
setPaths(t, p, "/does/not/matter")
174+
175+
// Should still work
176+
evt, err := p.Run(newTestEvent())
177+
require.NoError(t, err)
178+
v, _ := evt.GetValue("x")
179+
assert.Equal(t, int64(1), v)
180+
})
142181
}
143182

144183
func TestRunWithStats(t *testing.T) {
@@ -181,6 +220,35 @@ func TestRunWithStats(t *testing.T) {
181220
})
182221
}
183222

223+
func TestSetPathsWithRelativePath(t *testing.T) {
224+
tmpDir := t.TempDir()
225+
scriptsDir := filepath.Join(tmpDir, "scripts")
226+
err := os.MkdirAll(scriptsDir, 0755)
227+
require.NoError(t, err)
228+
229+
writeFile(t, scriptsDir, "test.js", `function process(event) { event.Put("added", "value"); return event; }`)
230+
231+
p, err := NewFromConfig(Config{
232+
File: "scripts/test.js",
233+
}, nil, logptest.NewTestingLogger(t, ""))
234+
require.NoError(t, err)
235+
236+
jsProc, ok := p.(*jsProcessor)
237+
require.True(t, ok, "processor should be *jsProcessor")
238+
239+
// Initialize with paths where Config points to configDir
240+
err = jsProc.SetPaths(tmpPaths(tmpDir))
241+
require.NoError(t, err)
242+
243+
// Should resolve relative to the config/ directory
244+
evt, err := jsProc.Run(newTestEvent())
245+
require.NoError(t, err)
246+
247+
val, err := evt.GetValue("added")
248+
require.NoError(t, err)
249+
assert.Equal(t, "value", val)
250+
}
251+
184252
func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Processor {
185253
t.Helper()
186254
cfg := map[string]any{key: value}
@@ -194,10 +262,36 @@ func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Proc
194262
return p
195263
}
196264

265+
func setPaths(t *testing.T, p beat.Processor, tmpDir string) {
266+
t.Helper()
267+
require.IsType(t, &jsProcessor{}, p)
268+
jsProc, ok := p.(*jsProcessor)
269+
require.True(t, ok, "expected *jsProcessor type")
270+
err := jsProc.SetPaths(tmpPaths(tmpDir))
271+
require.NoError(t, err)
272+
}
273+
197274
func writeFile(t *testing.T, dir, name, contents string) string {
198275
t.Helper()
199276
path := filepath.Join(dir, name)
200277
err := os.WriteFile(path, []byte(contents), 0o644)
201278
require.NoErrorf(t, err, "failed to write to file %s", path)
202279
return path
203280
}
281+
282+
func newTestEvent() *beat.Event {
283+
return &beat.Event{
284+
Fields: mapstr.M{
285+
"message": "test event",
286+
},
287+
}
288+
}
289+
290+
func tmpPaths(dir string) *paths.Path {
291+
return &paths.Path{
292+
Home: dir,
293+
Config: dir,
294+
Data: dir,
295+
Logs: dir,
296+
}
297+
}

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
156156
}
157157

158158
// multiReceiverConfig creates a Config for testing multiple receivers.
159-
// Each receiver gets a unique home path.
159+
// Each receiver gets a unique home path and a JavaScript processor that loads from its own path.config directory.
160160
func multiReceiverConfig(helper multiReceiverHelper) *Config {
161161
return &Config{
162162
Beatconfig: map[string]any{
@@ -167,6 +167,17 @@ func multiReceiverConfig(helper multiReceiverHelper) *Config {
167167
"enabled": true,
168168
"message": "test",
169169
"count": 1,
170+
// Each receiver gets a JavaScript processor that loads from its own
171+
// path.config directory, adding a unique marker field to verify isolation.
172+
"processors": []map[string]any{
173+
{
174+
"script": map[string]any{
175+
"lang": "javascript",
176+
"file": "processor.js",
177+
"tag": "js-" + helper.jsMarker,
178+
},
179+
},
180+
},
170181
},
171182
{
172183
"type": "filestream",
@@ -194,14 +205,27 @@ type multiReceiverHelper struct {
194205
name string
195206
home string
196207
ingest string
208+
jsMarker string
197209
monitorSocket string
198210
}
199211

200212
func newMultiReceiverHelper(t *testing.T, number int) multiReceiverHelper {
213+
const (
214+
scriptFormat = `function process(event) { event.Put("js_marker", %q); return event; }`
215+
)
216+
217+
home := t.TempDir()
218+
219+
// Create JavaScript processor files in each receiver's home directory.
220+
// Each script adds a unique marker field to verify path isolation.
221+
jsMarker := fmt.Sprintf("receiver%d", number)
222+
writeFile(t, filepath.Join(home, "processor.js"), fmt.Sprintf(scriptFormat, jsMarker))
223+
201224
return multiReceiverHelper{
202225
name: fmt.Sprintf("r%d", number),
203-
home: t.TempDir(),
226+
home: home,
204227
ingest: filepath.Join(t.TempDir(), fmt.Sprintf("test%d.log", number)),
228+
jsMarker: jsMarker,
205229
monitorSocket: genSocketPath(t),
206230
}
207231
}
@@ -241,6 +265,10 @@ func TestMultipleReceivers(t *testing.T) {
241265
assert.Equalf(c, "filebeatreceiver/"+helper.name, logs[helper.name][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in %v log record", helper)
242266
assert.Equalf(c, "receiver", logs[helper.name][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in %v log record", helper)
243267

268+
// Verify that each receiver used its own JavaScript processor script.
269+
// This demonstrates path isolation: each receiver loads processor.js from its own path.config.
270+
assert.Equalf(c, helper.jsMarker, logs[helper.name][0].Flatten()["js_marker"], "expected %v to have js_marker from its own script", helper)
271+
244272
// Make sure that each receiver has a separate logger
245273
// instance and does not interfere with others. Previously, the
246274
// logger in Beats was global, causing logger fields to be

0 commit comments

Comments
 (0)