Skip to content

Commit

Permalink
Add support for variables in outputs and default provider (#6602) (#6754
Browse files Browse the repository at this point in the history
)

Adds support for context variables (ONLY) in outputs. Adds support for a default provide prefix to be defined for variables (default is env). This provides support for the original ${ES_PASSWORD} when used in outputs to still work as it will automatically map that to ${env.ES_PASSWORD} and it will get resolved for the context variables the same as it was being done by go-ucfg.

This includes an improvement to how variables are observed in the composable controller and how it is used by the coordinator. Now when a set of observable's are passed to the composable controller it will return the current set of variables after the debounce time, this ensures that before the variables are substituted that it is using the latest set of variables. Without this change running would always show an error at first with ${env.ES_PASSWORD} is an unknown variable and then less than a few milliseconds it would find it. This change removes that behavior and is able to find the variable on initial render.

(cherry picked from commit b59d51a)

Co-authored-by: Blake Rouse <[email protected]>
  • Loading branch information
mergify[bot] and blakerouse authored Feb 11, 2025
1 parent d80d79d commit e4c8fb6
Show file tree
Hide file tree
Showing 31 changed files with 667 additions and 146 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add context variable support to outputs

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Adds support for using context variable providers in the outputs section of a policy. Includes fallback support
to reference env provider when no provider prefix is provided in the variable.
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6602

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/6376
62 changes: 46 additions & 16 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ type ConfigManager interface {
type VarsManager interface {
Runner

// DefaultProvider returns the default provider that the variable manager is configured to use.
DefaultProvider() string

// Observe instructs the variables to observe.
Observe([]string)
Observe(context.Context, []string) ([]*transpiler.Vars, error)

// Watch returns the chanel to watch for variable changes.
Watch() <-chan []*transpiler.Vars
Expand Down Expand Up @@ -1244,7 +1247,11 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config
}

// pass the observed vars from the AST to the varsMgr
c.observeASTVars()
err = c.observeASTVars(ctx)
if err != nil {
// only possible error here is the context being cancelled
return err
}

// Disabled for 8.8.0 release in order to limit the surface
// https://github.com/elastic/security-team/issues/6501
Expand Down Expand Up @@ -1327,25 +1334,32 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
// observeASTVars identifies the variables that are referenced in the computed AST and passed to
// the varsMgr so it knows what providers are being referenced. If a providers is not being
// referenced then the provider does not need to be running.
func (c *Coordinator) observeASTVars() {
func (c *Coordinator) observeASTVars(ctx context.Context) error {
if c.varsMgr == nil {
// No varsMgr (only happens in testing)
return
return nil
}
if c.ast == nil {
// No AST; no vars
c.varsMgr.Observe(nil)
return
var vars []string
if c.ast != nil {
inputs, ok := transpiler.Lookup(c.ast, "inputs")
if ok {
vars = inputs.Vars(vars, c.varsMgr.DefaultProvider())
}
outputs, ok := transpiler.Lookup(c.ast, "outputs")
if ok {
vars = outputs.Vars(vars, c.varsMgr.DefaultProvider())
}
}
inputs, ok := transpiler.Lookup(c.ast, "inputs")
if !ok {
// No inputs; no vars
c.varsMgr.Observe(nil)
return
updated, err := c.varsMgr.Observe(ctx, vars)
if err != nil {
// context cancel
return err
}
var vars []string
vars = inputs.Vars(vars)
c.varsMgr.Observe(vars)
if updated != nil {
// provided an updated set of vars (observed changed)
c.vars = updated
}
return nil
}

// processVars updates the transpiler vars in the Coordinator.
Expand Down Expand Up @@ -1421,6 +1435,8 @@ func (c *Coordinator) generateComponentModel() (err error) {
}()

ast := c.ast.ShallowClone()

// perform variable substitution for inputs
inputs, ok := transpiler.Lookup(ast, "inputs")
if ok {
renderedInputs, err := transpiler.RenderInputs(inputs, c.vars)
Expand All @@ -1433,6 +1449,20 @@ func (c *Coordinator) generateComponentModel() (err error) {
}
}

// perform variable substitution for outputs
// outputs only support the context variables (dynamic provides are not provide to the outputs)
outputs, ok := transpiler.Lookup(ast, "outputs")
if ok {
renderedOutputs, err := transpiler.RenderOutputs(outputs, c.vars)
if err != nil {
return fmt.Errorf("rendering outputs failed: %w", err)
}
err = transpiler.Insert(ast, renderedOutputs, "outputs")
if err != nil {
return fmt.Errorf("inserting rendered outputs failed: %w", err)
}
}

cfg, err := ast.Map()
if err != nil {
return fmt.Errorf("failed to convert ast to map[string]interface{}: %w", err)
Expand Down
17 changes: 15 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ func BenchmarkCoordinator_generateComponentModel(b *testing.B) {
require.NoError(b, err)
vars := make([]*transpiler.Vars, len(varsMaps))
for i, vm := range varsMaps {
vars[i], err = transpiler.NewVars(fmt.Sprintf("%d", i), vm, mapstr.M{})
vars[i], err = transpiler.NewVars(fmt.Sprintf("%d", i), vm, mapstr.M{}, "")
require.NoError(b, err)
}

Expand Down Expand Up @@ -1188,6 +1188,9 @@ func (l *configChange) Fail(err error) {
}

type fakeVarsManager struct {
varsMx sync.RWMutex
vars []*transpiler.Vars

varsCh chan []*transpiler.Vars
errCh chan error

Expand Down Expand Up @@ -1222,19 +1225,29 @@ func (f *fakeVarsManager) Watch() <-chan []*transpiler.Vars {
return f.varsCh
}

func (f *fakeVarsManager) Observe(observed []string) {
func (f *fakeVarsManager) Observe(ctx context.Context, observed []string) ([]*transpiler.Vars, error) {
f.observedMx.Lock()
defer f.observedMx.Unlock()
f.observed = observed
f.varsMx.RLock()
defer f.varsMx.RUnlock()
return f.vars, nil
}

func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) {
f.varsMx.Lock()
f.vars = vars
f.varsMx.Unlock()
select {
case <-ctx.Done():
case f.varsCh <- vars:
}
}

func (f *fakeVarsManager) DefaultProvider() string {
return ""
}

type fakeOTelManager struct {
updateCallback func(*confmap.Conf) error
result error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ inputs:
components = nil
vars, err := transpiler.NewVars("", map[string]interface{}{
"TEST_VAR": "input-id",
}, nil)
}, nil, "")
require.NoError(t, err, "Vars creation must succeed")
varsChan <- []*transpiler.Vars{vars}
coord.runLoopIteration(ctx)
Expand All @@ -1121,7 +1121,7 @@ inputs:
components = nil
vars, err = transpiler.NewVars("", map[string]interface{}{
"TEST_VAR": "changed-input-id",
}, nil)
}, nil, "")
require.NoError(t, err, "Vars creation must succeed")
varsChan <- []*transpiler.Vars{vars}
coord.runLoopIteration(ctx)
Expand Down Expand Up @@ -1239,7 +1239,7 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) {
// (Coordinator will only regenerate its component model when it has non-nil
// vars).
func emptyVars(t *testing.T) []*transpiler.Vars {
vars, err := transpiler.NewVars("", map[string]interface{}{}, nil)
vars, err := transpiler.NewVars("", map[string]interface{}{}, nil, "")
require.NoError(t, err, "Vars creation must succeed")
return []*transpiler.Vars{vars}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestDiagnosticVariables(t *testing.T) {
map[string]interface{}{
"testvar": "testvalue",
},
nil)
nil, "")
require.NoError(t, err)

expected := `
Expand Down
26 changes: 13 additions & 13 deletions internal/pkg/agent/transpiler/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Node interface {

// Vars adds to the array with the variables identified in the node. Returns the array in-case
// the capacity of the array had to be changed.
Vars([]string) []string
Vars([]string, string) []string

// Apply apply the current vars, returning the new value for the node. This does not modify the original Node.
Apply(*Vars) (Node, error)
Expand Down Expand Up @@ -182,10 +182,10 @@ func (d *Dict) Hash64With(h *xxhash.Digest) error {
}

// Vars returns a list of all variables referenced in the dictionary.
func (d *Dict) Vars(vars []string) []string {
func (d *Dict) Vars(vars []string, defaultProvider string) []string {
for _, v := range d.value {
k := v.(*Key)
vars = k.Vars(vars)
vars = k.Vars(vars, defaultProvider)
}
return vars
}
Expand Down Expand Up @@ -318,11 +318,11 @@ func (k *Key) Hash64With(h *xxhash.Digest) error {
}

// Vars returns a list of all variables referenced in the value.
func (k *Key) Vars(vars []string) []string {
func (k *Key) Vars(vars []string, defaultProvider string) []string {
if k.value == nil {
return vars
}
return k.value.Vars(vars)
return k.value.Vars(vars, defaultProvider)
}

// Apply applies the vars to the value. This does not modify the original node.
Expand Down Expand Up @@ -463,9 +463,9 @@ func (l *List) ShallowClone() Node {
}

// Vars returns a list of all variables referenced in the list.
func (l *List) Vars(vars []string) []string {
func (l *List) Vars(vars []string, defaultProvider string) []string {
for _, v := range l.value {
vars = v.Vars(vars)
vars = v.Vars(vars, defaultProvider)
}
return vars
}
Expand Down Expand Up @@ -552,12 +552,12 @@ func (s *StrVal) Hash64With(h *xxhash.Digest) error {
}

// Vars returns a list of all variables referenced in the string.
func (s *StrVal) Vars(vars []string) []string {
func (s *StrVal) Vars(vars []string, defaultProvider string) []string {
// errors are ignored (if there is an error determine the vars it will also error computing the policy)
_, _ = replaceVars(s.value, func(variable string) (Node, Processors, bool) {
vars = append(vars, variable)
return nil, nil, false
}, false)
}, false, defaultProvider)
return vars
}

Expand Down Expand Up @@ -613,7 +613,7 @@ func (s *IntVal) ShallowClone() Node {
}

// Vars does nothing. Cannot have variable in an IntVal.
func (s *IntVal) Vars(vars []string) []string {
func (s *IntVal) Vars(vars []string, defaultProvider string) []string {
return vars
}

Expand Down Expand Up @@ -691,7 +691,7 @@ func (s *UIntVal) Hash64With(h *xxhash.Digest) error {
}

// Vars does nothing. Cannot have variable in an UIntVal.
func (s *UIntVal) Vars(vars []string) []string {
func (s *UIntVal) Vars(vars []string, defaultProvider string) []string {
return vars
}

Expand Down Expand Up @@ -764,7 +764,7 @@ func (s *FloatVal) hashString() string {
}

// Vars does nothing. Cannot have variable in an FloatVal.
func (s *FloatVal) Vars(vars []string) []string {
func (s *FloatVal) Vars(vars []string, defaultProvider string) []string {
return vars
}

Expand Down Expand Up @@ -843,7 +843,7 @@ func (s *BoolVal) Hash64With(h *xxhash.Digest) error {
}

// Vars does nothing. Cannot have variable in an BoolVal.
func (s *BoolVal) Vars(vars []string) []string {
func (s *BoolVal) Vars(vars []string, defaultProvider string) []string {
return vars
}

Expand Down
Loading

0 comments on commit e4c8fb6

Please sign in to comment.