Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only run providers that are referenced in the policy #6169

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Only run providers referenced in the policy

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6169

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/3609
30 changes: 30 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ type ConfigManager interface {
type VarsManager interface {
Runner

// Observe instructs the variables to observe.
Observe([]string)

// Watch returns the chanel to watch for variable changes.
Watch() <-chan []*transpiler.Vars
}
Expand Down Expand Up @@ -1235,6 +1238,9 @@ func (c *Coordinator) processConfigAgent(ctx context.Context, cfg *config.Config
return err
}

// pass the observed vars from the AST to the varsMgr
c.observeASTVars()

// Disabled for 8.8.0 release in order to limit the surface
// https://github.com/elastic/security-team/issues/6501

Expand Down Expand Up @@ -1313,6 +1319,30 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
return nil
}

// observeASTVars identifies the variables that are referenced in the computed AST and passed to
// the varsMgr so it knows what providers are being referenced. If a providers is not being
// referenced then the provider does not need to be running.
func (c *Coordinator) observeASTVars() {
if c.varsMgr == nil {
// No varsMgr (only happens in testing)
return
}
if c.ast == nil {
// No AST; no vars
c.varsMgr.Observe(nil)
return
}
inputs, ok := transpiler.Lookup(c.ast, "inputs")
if !ok {
// No inputs; no vars
c.varsMgr.Observe(nil)
return
}
var vars []string
vars = inputs.Vars(vars)
c.varsMgr.Observe(vars)
}

// processVars updates the transpiler vars in the Coordinator.
// Called on the main Coordinator goroutine.
func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) {
Expand Down
81 changes: 81 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
goruntime "runtime"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -327,6 +328,77 @@ func mustNewStruct(t *testing.T, v map[string]interface{}) *structpb.Struct {
return str
}

func TestCoordinator_VarsMgr_Observe(t *testing.T) {
coordCh := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
stateChan := coord.StateSubscribe(ctx, 32)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
// allowed error
err = nil
}
coordCh <- err
}()

// wait for it to be in starting state
waitForState(t, stateChan, func(state State) bool {
return state.State == agentclient.Starting &&
state.Message == "Waiting for initial configuration and composable variables"
}, 3*time.Second)

// set vars state should stay same (until config)
varsMgr.Vars(ctx, []*transpiler.Vars{{}})

// State changes happen asynchronously in the Coordinator goroutine, so
// wait a little bit to make sure no changes are reported; if the Vars
// call does trigger a change, it should happen relatively quickly.
select {
case <-stateChan:
assert.Fail(t, "Vars call shouldn't cause a state change")
case <-time.After(50 * time.Millisecond):
}

// set configuration that has variables present
cfg, err := config.NewConfigFrom(map[string]interface{}{
"inputs": []interface{}{
map[string]interface{}{
"type": "filestream",
"paths": []interface{}{
"${env.filestream_path|env.log_path|'/var/log/syslog'}",
},
},
map[string]interface{}{
"type": "windows",
"condition": "${host.platform} == 'windows'",
},
},
})
require.NoError(t, err)
cfgMgr.Config(ctx, cfg)

// healthy signals that the configuration has been computed
waitForState(t, stateChan, func(state State) bool {
return state.State == agentclient.Healthy && state.Message == "Running"
}, 3*time.Second)

// get the set observed vars from the fake vars manager
varsMgr.observedMx.Lock()
observed := varsMgr.observed
varsMgr.observedMx.Unlock()

// stop the coordinator
cancel()
err = <-coordCh
require.NoError(t, err)

// verify that the observed vars are the expected vars
assert.Equal(t, []string{"env.filestream_path", "env.log_path", "host.platform"}, observed)
}

func TestCoordinator_State_Starting(t *testing.T) {
coordCh := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1072,6 +1144,9 @@ func (l *configChange) Fail(err error) {
type fakeVarsManager struct {
varsCh chan []*transpiler.Vars
errCh chan error

observedMx sync.RWMutex
observed []string
}

func newFakeVarsManager() *fakeVarsManager {
Expand Down Expand Up @@ -1101,6 +1176,12 @@ func (f *fakeVarsManager) Watch() <-chan []*transpiler.Vars {
return f.varsCh
}

func (f *fakeVarsManager) Observe(observed []string) {
f.observedMx.Lock()
defer f.observedMx.Unlock()
f.observed = observed
}

func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) {
select {
case <-ctx.Done():
Expand Down
7 changes: 1 addition & 6 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,10 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
l.Info("APM instrumentation disabled")
}

coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
coord, configMgr, _, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
if err != nil {
return logReturn(l, err)
}
defer func() {
if composable != nil {
composable.Close()
}
}()

monitoringServer, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord)
if err != nil {
Expand Down
64 changes: 64 additions & 0 deletions internal/pkg/agent/transpiler/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type Node interface {
// Hash compute a sha256 hash of the current node and recursively call any children.
Hash() []byte

// Vars adds to the array with the variables identified in the node. Returns the array in-case
// the capacity of the array had to be changed.
Vars([]string) []string
blakerouse marked this conversation as resolved.
Show resolved Hide resolved

// Apply apply the current vars, returning the new value for the node.
Apply(*Vars) (Node, error)

Expand Down Expand Up @@ -162,6 +166,15 @@ func (d *Dict) Hash() []byte {
return h.Sum(nil)
}

// Vars returns a list of all variables referenced in the dictionary.
func (d *Dict) Vars(vars []string) []string {
for _, v := range d.value {
k := v.(*Key)
vars = k.Vars(vars)
}
return vars
}

// Apply applies the vars to all the nodes in the dictionary.
func (d *Dict) Apply(vars *Vars) (Node, error) {
nodes := make([]Node, 0, len(d.value))
Expand Down Expand Up @@ -277,6 +290,14 @@ func (k *Key) Hash() []byte {
return h.Sum(nil)
}

// Vars returns a list of all variables referenced in the value.
func (k *Key) Vars(vars []string) []string {
if k.value == nil {
return vars
}
return k.value.Vars(vars)
}

// Apply applies the vars to the value.
func (k *Key) Apply(vars *Vars) (Node, error) {
if k.value == nil {
Expand Down Expand Up @@ -397,6 +418,14 @@ func (l *List) ShallowClone() Node {
return &List{value: nodes}
}

// Vars returns a list of all variables referenced in the list.
func (l *List) Vars(vars []string) []string {
for _, v := range l.value {
vars = v.Vars(vars)
}
return vars
}

// Apply applies the vars to all nodes in the list.
func (l *List) Apply(vars *Vars) (Node, error) {
nodes := make([]Node, 0, len(l.value))
Expand Down Expand Up @@ -472,6 +501,16 @@ func (s *StrVal) Hash() []byte {
return []byte(s.value)
}

// Vars returns a list of all variables referenced in the string.
func (s *StrVal) Vars(vars []string) []string {
// errors are ignored (if there is an error determine the vars it will also error computing the policy)
_, _ = replaceVars(s.value, func(variable string) (Node, Processors, bool) {
vars = append(vars, variable)
return nil, nil, false
}, false)
return vars
}

// Apply applies the vars to the string value.
func (s *StrVal) Apply(vars *Vars) (Node, error) {
return vars.Replace(s.value)
Expand Down Expand Up @@ -523,6 +562,11 @@ func (s *IntVal) ShallowClone() Node {
return s.Clone()
}

// Vars does nothing. Cannot have variable in an IntVal.
func (s *IntVal) Vars(vars []string) []string {
return vars
}

// Apply does nothing.
func (s *IntVal) Apply(_ *Vars) (Node, error) {
return s, nil
Expand Down Expand Up @@ -584,6 +628,11 @@ func (s *UIntVal) Hash() []byte {
return []byte(s.String())
}

// Vars does nothing. Cannot have variable in an UIntVal.
func (s *UIntVal) Vars(vars []string) []string {
return vars
}

// Apply does nothing.
func (s *UIntVal) Apply(_ *Vars) (Node, error) {
return s, nil
Expand Down Expand Up @@ -641,6 +690,11 @@ func (s *FloatVal) Hash() []byte {
return []byte(strconv.FormatFloat(s.value, 'f', -1, 64))
}

// Vars does nothing. Cannot have variable in an FloatVal.
func (s *FloatVal) Vars(vars []string) []string {
return vars
}

// Apply does nothing.
func (s *FloatVal) Apply(_ *Vars) (Node, error) {
return s, nil
Expand Down Expand Up @@ -703,6 +757,11 @@ func (s *BoolVal) Hash() []byte {
return falseVal
}

// Vars does nothing. Cannot have variable in an BoolVal.
func (s *BoolVal) Vars(vars []string) []string {
return vars
}

// Apply does nothing.
func (s *BoolVal) Apply(_ *Vars) (Node, error) {
return s, nil
Expand Down Expand Up @@ -982,6 +1041,11 @@ func attachProcessors(node Node, processors Processors) Node {

// Lookup accept an AST and a selector and return the matching Node at that position.
func Lookup(a *AST, selector Selector) (Node, bool) {
// Be defensive and ensure that the ast is usable.
if a == nil || a.root == nil {
return nil, false
}
blakerouse marked this conversation as resolved.
Show resolved Hide resolved

// Run through the graph and find matching nodes.
current := a.root
for _, part := range splitPath(selector) {
Expand Down
Loading