From 07b52b599a44bb1000bd4b5d2e7e712d24f6738d Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 29 Nov 2024 10:10:35 -0500 Subject: [PATCH] Fixes from code review. --- internal/pkg/agent/transpiler/ast.go | 37 ++---- internal/pkg/agent/transpiler/ast_test.go | 52 +++++++++ internal/pkg/agent/transpiler/vars.go | 97 +++++++++------- internal/pkg/composable/controller.go | 131 ++++++++++++++++------ 4 files changed, 207 insertions(+), 110 deletions(-) diff --git a/internal/pkg/agent/transpiler/ast.go b/internal/pkg/agent/transpiler/ast.go index 389fd8ec682..149818d502b 100644 --- a/internal/pkg/agent/transpiler/ast.go +++ b/internal/pkg/agent/transpiler/ast.go @@ -58,7 +58,8 @@ type Node interface { // Hash compute a sha256 hash of the current node and recursively call any children. Hash() []byte - // Vars return a list of referenced vars. + // Vars adds to the array with the variables identified in the node. Returns the array in-case + // the capacity of the array had to be changed. Vars([]string) []string // Apply apply the current vars, returning the new value for the node. @@ -502,35 +503,11 @@ func (s *StrVal) Hash() []byte { // Vars returns a list of all variables referenced in the string. func (s *StrVal) Vars(vars []string) []string { - value := s.value - matchIdxs := varsRegex.FindAllSubmatchIndex([]byte(value), -1) - if !validBrackets(value, matchIdxs) { - // brackets are not valid; unable to pull vars (computing the policy will fail) - return vars - } - for _, r := range matchIdxs { - for i := 0; i < len(r); i += 4 { - if value[r[i]+1] == '$' { - // match on an escaped var, this is not a real variable - continue - } - // match on a non-escaped var - extractedVars, err := extractVars(value[r[i+2]:r[i+3]]) - if err != nil { - // variable parsing failed (computing the policy will fail) - return vars - } - for _, val := range extractedVars { - switch val.(type) { - case *constString: - // not a variable - case *varString: - // found variable add it to the array - vars = append(vars, val.Value()) - } - } - } - } + // errors are ignored (if there is an error determine the vars it will also error computing the policy) + _, _ = replaceVars(s.value, func(variable string) (Node, Processors, bool) { + vars = append(vars, variable) + return nil, nil, false + }, false) return vars } diff --git a/internal/pkg/agent/transpiler/ast_test.go b/internal/pkg/agent/transpiler/ast_test.go index e5cf185535f..cdbaff5df7a 100644 --- a/internal/pkg/agent/transpiler/ast_test.go +++ b/internal/pkg/agent/transpiler/ast_test.go @@ -1019,6 +1019,58 @@ func TestVars(t *testing.T) { } } +func TestLookup(t *testing.T) { + tests := map[string]struct { + ast *AST + selector Selector + node Node + ok bool + }{ + "nil": { + ast: nil, + selector: "", + node: nil, + ok: false, + }, + "noroot": { + ast: &AST{}, + selector: "", + node: nil, + ok: false, + }, + "notfound": { + ast: &AST{ + root: NewDict([]Node{NewKey("entry", NewDict([]Node{ + NewKey("var1", NewStrVal("value1")), + NewKey("var2", NewStrVal("value2")), + }))}), + }, + selector: "entry.var3", + node: nil, + ok: false, + }, + "found": { + ast: &AST{ + root: NewDict([]Node{NewKey("entry", NewDict([]Node{ + NewKey("var1", NewStrVal("value1")), + NewKey("var2", NewStrVal("value2")), + }))}), + }, + selector: "entry.var2", + node: NewKey("var2", NewStrVal("value2")), + ok: true, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + node, ok := Lookup(test.ast, test.selector) + if assert.Equal(t, test.ok, ok) { + assert.Equal(t, test.node, node) + } + }) + } +} + func mustMakeVars(mapping map[string]interface{}) *Vars { v, err := NewVars("", mapping, nil) if err != nil { diff --git a/internal/pkg/agent/transpiler/vars.go b/internal/pkg/agent/transpiler/vars.go index bcf845b7c6f..71bd8bd4cb6 100644 --- a/internal/pkg/agent/transpiler/vars.go +++ b/internal/pkg/agent/transpiler/vars.go @@ -54,6 +54,56 @@ func NewVarsWithProcessorsFromAst(id string, tree *AST, processorKey string, pro // Replace returns a new value based on variable replacement. func (v *Vars) Replace(value string) (Node, error) { + return replaceVars(value, func(variable string) (Node, Processors, bool) { + var processors Processors + node, ok := v.lookupNode(variable) + if ok && v.processorsKey != "" && varPrefixMatched(variable, v.processorsKey) { + processors = v.processors + } + return node, processors, ok + }, true) +} + +// ID returns the unique ID for the vars. +func (v *Vars) ID() string { + return v.id +} + +// Lookup returns the value from the vars. +func (v *Vars) Lookup(name string) (interface{}, bool) { + // lookup in the AST tree + return v.tree.Lookup(name) +} + +// Map transforms the variables into a map[string]interface{} and will abort and return any errors related +// to type conversion. +func (v *Vars) Map() (map[string]interface{}, error) { + return v.tree.Map() +} + +// lookupNode performs a lookup on the AST, but keeps the result as a `Node`. +// +// This is different from `Lookup` which returns the actual type, not the AST type. +func (v *Vars) lookupNode(name string) (Node, bool) { + // check if the value can be retrieved from a FetchContextProvider + for providerName, provider := range v.fetchContextProviders { + if varPrefixMatched(name, providerName) { + fetchProvider, ok := provider.(composable.FetchContextProvider) + if !ok { + return &StrVal{value: ""}, false + } + fval, found := fetchProvider.Fetch(name) + if found { + return &StrVal{value: fval}, true + } + return &StrVal{value: ""}, false + } + } + // lookup in the AST tree + return Lookup(v.tree, name) +} + +func replaceVars(value string, replacer func(variable string) (Node, Processors, bool), reqMatch bool) (Node, error) { var processors Processors matchIdxs := varsRegex.FindAllSubmatchIndex([]byte(value), -1) if !validBrackets(value, matchIdxs) { @@ -81,11 +131,11 @@ func (v *Vars) Replace(value string) (Node, error) { result += value[lastIndex:r[0]] + val.Value() set = true case *varString: - node, ok := v.lookupNode(val.Value()) + node, nodeProcessors, ok := replacer(val.Value()) if ok { node := nodeToValue(node) - if v.processorsKey != "" && varPrefixMatched(val.Value(), v.processorsKey) { - processors = v.processors + if nodeProcessors != nil { + processors = nodeProcessors } if r[i] == 0 && r[i+1] == len(value) { // possible for complete replacement of object, because the variable @@ -100,7 +150,7 @@ func (v *Vars) Replace(value string) (Node, error) { break } } - if !set { + if !set && reqMatch { return NewStrVal(""), ErrNoMatch } lastIndex = r[1] @@ -109,45 +159,6 @@ func (v *Vars) Replace(value string) (Node, error) { return NewStrValWithProcessors(result+value[lastIndex:], processors), nil } -// ID returns the unique ID for the vars. -func (v *Vars) ID() string { - return v.id -} - -// Lookup returns the value from the vars. -func (v *Vars) Lookup(name string) (interface{}, bool) { - // lookup in the AST tree - return v.tree.Lookup(name) -} - -// Map transforms the variables into a map[string]interface{} and will abort and return any errors related -// to type conversion. -func (v *Vars) Map() (map[string]interface{}, error) { - return v.tree.Map() -} - -// lookupNode performs a lookup on the AST, but keeps the result as a `Node`. -// -// This is different from `Lookup` which returns the actual type, not the AST type. -func (v *Vars) lookupNode(name string) (Node, bool) { - // check if the value can be retrieved from a FetchContextProvider - for providerName, provider := range v.fetchContextProviders { - if varPrefixMatched(name, providerName) { - fetchProvider, ok := provider.(composable.FetchContextProvider) - if !ok { - return &StrVal{value: ""}, false - } - fval, found := fetchProvider.Fetch(name) - if found { - return &StrVal{value: fval}, true - } - return &StrVal{value: ""}, false - } - } - // lookup in the AST tree - return Lookup(v.tree, name) -} - // nodeToValue ensures that the node is an actual value. func nodeToValue(node Node) Node { switch n := node.(type) { diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index 04cf1fd35e3..b983e48e238 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -47,9 +47,9 @@ type controller struct { errCh chan error restartInterval time.Duration - managed bool - contextProviders map[string]contextProvider - dynamicProviders map[string]dynamicProvider + managed bool + contextProviderBuilders map[string]contextProvider + dynamicProviderBuilders map[string]dynamicProvider contextProviderStates map[string]*contextProviderState dynamicProviderStates map[string]*dynamicProviderState @@ -107,16 +107,16 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) } return &controller{ - logger: l, - ch: make(chan []*transpiler.Vars, 1), - observedCh: make(chan map[string]bool, 1), - errCh: make(chan error), - managed: managed, - restartInterval: restartInterval, - contextProviders: contextProviders, - dynamicProviders: dynamicProviders, - contextProviderStates: make(map[string]*contextProviderState), - dynamicProviderStates: make(map[string]*dynamicProviderState), + logger: l, + ch: make(chan []*transpiler.Vars, 1), + observedCh: make(chan map[string]bool, 1), + errCh: make(chan error), + managed: managed, + restartInterval: restartInterval, + contextProviderBuilders: contextProviders, + dynamicProviderBuilders: dynamicProviders, + contextProviderStates: make(map[string]*contextProviderState), + dynamicProviderStates: make(map[string]*dynamicProviderState), }, nil } @@ -130,8 +130,6 @@ func (c *controller) Run(ctx context.Context) error { localCtx, cancel := context.WithCancel(ctx) defer cancel() - fetchContextProviders := mapstr.M{} - c.logger.Debugf("Started controller for composable inputs") t := time.NewTimer(100 * time.Millisecond) @@ -158,12 +156,36 @@ func (c *controller) Run(ctx context.Context) error { wg.Wait() }() + // synchronize the fetch providers through a channel + var fetchProvidersLock sync.RWMutex + var fetchProviders mapstr.M + fetchCh := make(chan fetchProvider) + go func() { + for { + select { + case <-localCtx.Done(): + return + case msg := <-fetchCh: + fetchProvidersLock.Lock() + if msg.fetchProvider == nil { + _ = fetchProviders.Delete(msg.name) + } else { + _, _ = fetchProviders.Put(msg.name, msg.fetchProvider) + } + fetchProvidersLock.Unlock() + } + } + }() + // send initial vars state - err := c.sendVars(ctx, fetchContextProviders) + fetchProvidersLock.RLock() + err := c.sendVars(ctx, fetchProviders) if err != nil { + fetchProvidersLock.RUnlock() // only error is context cancel, no need to add error message context return err } + fetchProvidersLock.RUnlock() // performs debounce of notifies; accumulates them into 100 millisecond chunks for { @@ -173,11 +195,13 @@ func (c *controller) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case observed := <-c.observedCh: - c.handleObserved(localCtx, &wg, stateChangedChan, fetchContextProviders, observed) - t.Reset(100 * time.Millisecond) - c.logger.Debugf("Observed state changed for composable inputs; debounce started") - drainChan(stateChangedChan) - break DEBOUNCE + changed := c.handleObserved(localCtx, &wg, fetchCh, stateChangedChan, observed) + if changed { + t.Reset(100 * time.Millisecond) + c.logger.Debugf("Observed state changed for composable inputs; debounce started") + drainChan(stateChangedChan) + break DEBOUNCE + } case <-stateChangedChan: t.Reset(100 * time.Millisecond) c.logger.Debugf("Variable state changed for composable inputs; debounce started") @@ -196,11 +220,14 @@ func (c *controller) Run(ctx context.Context) error { } // send the vars to the watcher - err = c.sendVars(ctx, fetchContextProviders) + fetchProvidersLock.RLock() + err := c.sendVars(ctx, fetchProviders) if err != nil { + fetchProvidersLock.RUnlock() // only error is context cancel, no need to add error message context return err } + fetchProvidersLock.RUnlock() } } @@ -254,7 +281,9 @@ func (c *controller) Observe(vars []string) { c.observedCh <- topLevel } -func (c *controller) handleObserved(ctx context.Context, wg *sync.WaitGroup, stateChangedChan chan bool, fetchContextProviders mapstr.M, observed map[string]bool) { +func (c *controller) handleObserved(ctx context.Context, wg *sync.WaitGroup, fetchCh chan fetchProvider, stateChangedChan chan bool, observed map[string]bool) bool { + changed := false + // get the list of already running, so we can determine a list that needs to be stopped runningCtx := make(map[string]*contextProviderState, len(c.contextProviderStates)) runningDyn := make(map[string]*dynamicProviderState, len(c.dynamicProviderStates)) @@ -284,20 +313,20 @@ func (c *controller) handleObserved(ctx context.Context, wg *sync.WaitGroup, sta continue } - contextInfo, ok := c.contextProviders[name] + contextInfo, ok := c.contextProviderBuilders[name] if ok { - state := c.startContextProvider(ctx, wg, stateChangedChan, name, contextInfo) + state := c.startContextProvider(ctx, wg, fetchCh, stateChangedChan, name, contextInfo) if state != nil { + changed = true c.contextProviderStates[name] = state - if p, ok := state.provider.(corecomp.FetchContextProvider); ok { - _, _ = fetchContextProviders.Put(name, p) - } + } } - dynamicInfo, ok := c.dynamicProviders[name] + dynamicInfo, ok := c.dynamicProviderBuilders[name] if ok { state := c.startDynamicProvider(ctx, wg, stateChangedChan, name, dynamicInfo) if state != nil { + changed = true c.dynamicProviderStates[name] = state } } @@ -306,18 +335,22 @@ func (c *controller) handleObserved(ctx context.Context, wg *sync.WaitGroup, sta // running remaining need to be stopped for name, state := range runningCtx { + changed = true state.logger.Infof("Stopping provider %q", name) state.canceller() delete(c.contextProviderStates, name) } for name, state := range runningDyn { + changed = true state.logger.Infof("Stopping dynamic provider %q", name) state.canceller() delete(c.dynamicProviderStates, name) } + + return changed } -func (c *controller) startContextProvider(ctx context.Context, wg *sync.WaitGroup, stateChangedChan chan bool, name string, info contextProvider) *contextProviderState { +func (c *controller) startContextProvider(ctx context.Context, wg *sync.WaitGroup, fetchCh chan fetchProvider, stateChangedChan chan bool, name string, info contextProvider) *contextProviderState { wg.Add(1) l := c.logger.Named(strings.Join([]string{"providers", name}, ".")) @@ -344,17 +377,30 @@ func (c *controller) startContextProvider(ctx context.Context, wg *sync.WaitGrou case <-time.After(c.restartInterval): // wait restart interval and then try again } + continue + } + + fp, fpok := provider.(corecomp.FetchContextProvider) + if fpok { + sendFetchProvider(ctx, fetchCh, name, fp) } - state.provider = provider err = provider.Run(ctx, state) closeProvider(l, name, provider) if errors.Is(err, context.Canceled) { // valid exit + if fpok { + // turn off fetch provider + sendFetchProvider(ctx, fetchCh, name, nil) + } return } // all other exits are bad, even a nil error l.Errorf("provider %q failed to run (will retry in %s): %s", name, c.restartInterval.String(), err) + if fpok { + // turn off fetch provider + sendFetchProvider(ctx, fetchCh, name, nil) + } select { case <-ctx.Done(): return @@ -366,6 +412,13 @@ func (c *controller) startContextProvider(ctx context.Context, wg *sync.WaitGrou return state } +func sendFetchProvider(ctx context.Context, fetchCh chan fetchProvider, name string, fp corecomp.FetchContextProvider) { + select { + case <-ctx.Done(): + case fetchCh <- fetchProvider{name: name, fetchProvider: fp}: + } +} + func (c *controller) startDynamicProvider(ctx context.Context, wg *sync.WaitGroup, stateChangedChan chan bool, name string, info dynamicProvider) *dynamicProviderState { wg.Add(1) l := c.logger.Named(strings.Join([]string{"providers", name}, ".")) @@ -392,9 +445,10 @@ func (c *controller) startDynamicProvider(ctx context.Context, wg *sync.WaitGrou case <-time.After(c.restartInterval): // wait restart interval and then try again } + continue } - err = state.provider.Run(state) + err = provider.Run(state) closeProvider(l, name, provider) if errors.Is(err, context.Canceled) { return @@ -455,13 +509,17 @@ type dynamicProvider struct { cfg *config.Config } +type fetchProvider struct { + name string + fetchProvider corecomp.FetchContextProvider +} + type contextProviderState struct { context.Context - provider corecomp.ContextProvider - lock sync.RWMutex - mapping *transpiler.AST - signal chan bool + lock sync.RWMutex + mapping *transpiler.AST + signal chan bool logger *logger.Logger canceller context.CancelFunc @@ -519,7 +577,6 @@ type dynamicProviderMapping struct { type dynamicProviderState struct { context.Context - provider DynamicProvider lock sync.Mutex mappings map[string]dynamicProviderMapping signal chan bool