From faa8675ab6440783623e9496ce5e9e4bff322883 Mon Sep 17 00:00:00 2001 From: Mikhail Scherba <41360396+miklezzzz@users.noreply.github.com> Date: Fri, 12 Jul 2024 12:16:53 +0400 Subject: [PATCH] update scheduler logic regarding terminator extenders (#489) Signed-off-by: Mikhail Scherba --- pkg/module_manager/module_manager.go | 32 +-- .../extenders/mock/extenders_mock.go | 102 +++++++ pkg/module_manager/scheduler/scheduler.go | 92 +++--- .../scheduler/scheduler_test.go | 266 +++++++++++++++++- .../015-admission-policy-engine/enabled | 0 .../scheduler/testdata/042-kube-dns/enabled | 0 6 files changed, 434 insertions(+), 58 deletions(-) create mode 100644 pkg/module_manager/scheduler/extenders/mock/extenders_mock.go create mode 100755 pkg/module_manager/scheduler/testdata/015-admission-policy-engine/enabled create mode 100755 pkg/module_manager/scheduler/testdata/042-kube-dns/enabled diff --git a/pkg/module_manager/module_manager.go b/pkg/module_manager/module_manager.go index d964edba..d4adc3bb 100644 --- a/pkg/module_manager/module_manager.go +++ b/pkg/module_manager/module_manager.go @@ -291,39 +291,39 @@ func (mm *ModuleManager) Init() error { gv, err := mm.loadGlobalValues() if err != nil { - return err + return fmt.Errorf("couldn't load global values: %w", err) } staticExtender, err := static_extender.NewExtender(mm.ModulesDir) if err != nil { - return err + return fmt.Errorf("couldn't create static extender: %w", err) } if err := mm.moduleScheduler.AddExtender(staticExtender); err != nil { - return err + return fmt.Errorf("couldn't add static extender: %w", err) } err = mm.registerGlobalModule(gv.globalValues, gv.configSchema, gv.valuesSchema) if err != nil { - return err + return fmt.Errorf("couldn't register global module: %w", err) } kubeConfigExtender := kube_config_extender.NewExtender(mm.dependencies.KubeConfigManager) if err := mm.moduleScheduler.AddExtender(kubeConfigExtender); err != nil { - return err + return fmt.Errorf("couldn't add kube config extender: %w", err) } scriptEnabledExtender, err := script_extender.NewExtender(mm.TempDir) if err != nil { - return err + return fmt.Errorf("couldn't create script_enabled extender: %w", err) } if err := mm.moduleScheduler.AddExtender(scriptEnabledExtender); err != nil { - return err + return fmt.Errorf("couldn't add scrpt_enabled extender: %w", err) } // by this point we must have all required scheduler extenders attached if err := mm.moduleScheduler.ApplyExtenders(app.AppliedExtenders); err != nil { - return err + return fmt.Errorf("couldn't apply extenders to the module scheduler: %w", err) } return mm.registerModules(scriptEnabledExtender) @@ -645,22 +645,6 @@ func (mm *ModuleManager) RunModuleHook(moduleName, hookName string, binding Bind func (mm *ModuleManager) HandleKubeEvent(kubeEvent KubeEvent, createGlobalTaskFn func(*hooks.GlobalHook, controller.BindingExecutionInfo), createModuleTaskFn func(*modules.BasicModule, *hooks.ModuleHook, controller.BindingExecutionInfo)) { mm.LoopByBinding(OnKubernetesEvent, func(gh *hooks.GlobalHook, m *modules.BasicModule, mh *hooks.ModuleHook) { - defer func() { - if err := recover(); err != nil { - logEntry := log.WithField("function", "HandleKubeEvent").WithField("event", "OnKubernetesEvent") - - if gh != nil { - logEntry.WithField("GlobalHook name", gh.GetName()).WithField("GlobakHook path", gh.GetPath()) - } - - if mh != nil { - logEntry.WithField("ModuleHook name", mh.GetName()).WithField("ModuleHook path", mh.GetPath()) - } - - logEntry.Errorf("panic occurred: %s", err) - } - }() - if gh != nil { if gh.GetHookController().CanHandleKubeEvent(kubeEvent) { gh.GetHookController().HandleKubeEvent(kubeEvent, func(info controller.BindingExecutionInfo) { diff --git a/pkg/module_manager/scheduler/extenders/mock/extenders_mock.go b/pkg/module_manager/scheduler/extenders/mock/extenders_mock.go new file mode 100644 index 00000000..c2a5df01 --- /dev/null +++ b/pkg/module_manager/scheduler/extenders/mock/extenders_mock.go @@ -0,0 +1,102 @@ +// a bunch of mocked extenders for tests +package extenders_mock + +import "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders" + +type FilterOne struct{} + +func (f *FilterOne) Name() extenders.ExtenderName { + return extenders.ExtenderName("FilterOne") +} + +func (f *FilterOne) Filter(_ string, _ map[string]string) (*bool, error) { + return nil, nil +} + +func (f *FilterOne) IsTerminator() bool { + return false +} + +type FilterTwo struct{} + +func (f *FilterTwo) Name() extenders.ExtenderName { + return extenders.ExtenderName("FilterTwo") +} + +func (f *FilterTwo) Filter(_ string, _ map[string]string) (*bool, error) { + return nil, nil +} + +func (f *FilterTwo) IsTerminator() bool { + return false +} + +type FilterThree struct{} + +func (f *FilterThree) Name() extenders.ExtenderName { + return extenders.ExtenderName("FilterThree") +} + +func (f *FilterThree) Filter(_ string, _ map[string]string) (*bool, error) { + return nil, nil +} + +func (f *FilterThree) IsTerminator() bool { + return false +} + +type FilterFour struct{} + +func (f *FilterFour) Name() extenders.ExtenderName { + return extenders.ExtenderName("FilterOne") +} + +func (f *FilterFour) Filter(_ string, _ map[string]string) (*bool, error) { + return nil, nil +} + +func (f *FilterFour) IsTerminator() bool { + return false +} + +type TerminatorOne struct{} + +func (f *TerminatorOne) Name() extenders.ExtenderName { + return extenders.ExtenderName("TerminatorOne") +} + +func (f *TerminatorOne) Filter(_ string, _ map[string]string) (*bool, error) { + return nil, nil +} + +func (f *TerminatorOne) IsTerminator() bool { + return true +} + +type TerminatorTwo struct{} + +func (f *TerminatorTwo) Name() extenders.ExtenderName { + return extenders.ExtenderName("TerminatorTwo") +} + +func (f *TerminatorTwo) Filter(_ string, _ map[string]string) (*bool, error) { + return nil, nil +} + +func (f *TerminatorTwo) IsTerminator() bool { + return true +} + +type TerminatorThree struct{} + +func (f *TerminatorThree) Name() extenders.ExtenderName { + return extenders.ExtenderName("TerminatorThree") +} + +func (f *TerminatorThree) Filter(_ string, _ map[string]string) (*bool, error) { + return nil, nil +} + +func (f *TerminatorThree) IsTerminator() bool { + return true +} diff --git a/pkg/module_manager/scheduler/scheduler.go b/pkg/module_manager/scheduler/scheduler.go index a3df07cd..27e0b152 100644 --- a/pkg/module_manager/scheduler/scheduler.go +++ b/pkg/module_manager/scheduler/scheduler.go @@ -32,11 +32,16 @@ var defaultAppliedExtenders = []extenders.ExtenderName{ script_extender.Name, } +type extenderContainer struct { + ext extenders.Extender + filterAhead bool +} + type Scheduler struct { ctx context.Context // list of extenders to cycle over on a run - extenders []extenders.Extender + extenders []extenderContainer extCh chan extenders.ExtenderEvent // graph visualization graphImage image.Image @@ -66,7 +71,7 @@ func NewScheduler(ctx context.Context) *Scheduler { } return &Scheduler{ ctx: ctx, - extenders: make([]extenders.Extender, 0), + extenders: make([]extenderContainer, 0), extCh: make(chan extenders.ExtenderEvent, 1), dag: graph.New(nodeHash, graph.Directed(), graph.Acyclic()), diff: make(map[string]bool), @@ -222,8 +227,8 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error { appliedExtenders = defaultAppliedExtenders } else { availableExtenders := make(map[extenders.ExtenderName]bool, len(s.extenders)) - for _, ext := range s.extenders { - availableExtenders[ext.Name()] = true + for _, e := range s.extenders { + availableExtenders[e.ext.Name()] = true } extendersFromEnv := strings.Split(extendersEnv, ",") @@ -246,12 +251,12 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error { } } - newExtenders := []extenders.Extender{} + newExtenders := []extenderContainer{} for _, appliedExt := range appliedExtenders { - for _, ext := range s.extenders { - if ext.Name() == appliedExt { - newExtenders = append(newExtenders, ext) - if ne, ok := ext.(extenders.NotificationExtender); ok { + for _, e := range s.extenders { + if e.ext.Name() == appliedExt { + newExtenders = append(newExtenders, e) + if ne, ok := e.ext.(extenders.NotificationExtender); ok { ne.SetNotifyChannel(s.ctx, s.extCh) } break @@ -261,24 +266,38 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error { s.extenders = newExtenders + // set some extenders' meta + s.setExtendersMeta() + finalList := []extenders.ExtenderName{} - for _, ext := range s.extenders { - finalList = append(finalList, ext.Name()) + for _, e := range s.extenders { + finalList = append(finalList, e.ext.Name()) } - log.Infof("The list of applied module extenders: [%s]", finalList) + log.Infof("The list of applied module extenders: %s", finalList) return nil } +// setExtendersMeta and some extra meta to the extenders that lets terminators know if there are any filtering extenders left in the list +func (s *Scheduler) setExtendersMeta() { + var filterAhead bool + for i := len(s.extenders) - 1; i >= 0; i-- { + s.extenders[i].filterAhead = filterAhead + if !filterAhead && !s.extenders[i].ext.IsTerminator() { + filterAhead = true + } + } +} + // AddExtender adds a new extender to the slice of the extenders that are used to determine modules' states func (s *Scheduler) AddExtender(ext extenders.Extender) error { - for _, ex := range s.extenders { - if ex.Name() == ext.Name() { + for _, e := range s.extenders { + if e.ext.Name() == ext.Name() { return fmt.Errorf("extender %s already added", ext.Name()) } } - s.extenders = append(s.extenders, ext) + s.extenders = append(s.extenders, extenderContainer{ext: ext}) return nil } @@ -403,9 +422,9 @@ func (s *Scheduler) RecalculateGraph(logLabels map[string]string) (bool, []strin // Filter returns filtering result for the specified extender and module func (s *Scheduler) Filter(extName extenders.ExtenderName, moduleName string, logLabels map[string]string) (*bool, error) { - for _, ex := range s.extenders { - if ex.Name() == extName { - return ex.Filter(moduleName, logLabels) + for _, e := range s.extenders { + if e.ext.Name() == extName { + return e.ext.Filter(moduleName, logLabels) } } return nil, fmt.Errorf("extender %s not found", extName) @@ -424,7 +443,7 @@ func (s *Scheduler) recalculateGraphState(logLabels map[string]string) ( /* Grap names, err := graph.StableTopologicalSort(s.dag, moduleSortFunc) if err != nil { - errList = append(errList, err.Error()) + errList = append(errList, fmt.Sprintf("couldn't perform stable topological sort: %s", err.Error())) s.errList = errList return true, updByDiff } @@ -444,31 +463,40 @@ outerCycle: moduleName := vertex.GetName() vBuf[moduleName] = &vertexState{} - for _, ex := range s.extenders { - // if current extender is a terminating one and by this point the module is already disabled - there's little sense in checking against a terminator - if ok := ex.IsTerminator(); ok && !vBuf[moduleName].enabled { - continue + for _, e := range s.extenders { + // if current extender is a terminating one and by this point the module is already disabled - there's little sense in checking against all other terminators + if e.ext.IsTerminator() && !vBuf[moduleName].enabled && !e.filterAhead { + break } - moduleStatus, err := ex.Filter(moduleName, logLabels) + moduleStatus, err := e.ext.Filter(moduleName, logLabels) if err != nil { if permanent, ok := err.(*exerror.PermanentError); ok { - errList = append(errList, permanent.Error()) + errList = append(errList, fmt.Sprintf("%s extender failed to filter %s module: %s", e.ext.Name(), moduleName, permanent.Error())) break outerCycle } } if moduleStatus != nil { // if current extender is a terminating one and it says to disable - stop cycling over remaining extenders and disable the module - if ok := ex.IsTerminator(); ok { - if !*moduleStatus && vBuf[moduleName].enabled { - vBuf[moduleName].enabled = *moduleStatus - vBuf[moduleName].updatedBy = string(ex.Name()) + if e.ext.IsTerminator() { + // if disabled - terminate filtering + if !*moduleStatus { + if vBuf[moduleName].enabled || e.filterAhead { + vBuf[moduleName].enabled = *moduleStatus + vBuf[moduleName].updatedBy = string(e.ext.Name()) + } + break + } + + // if enabled and there are some other filtering extenders ahead - continue filtering + if e.filterAhead { + continue } break } vBuf[moduleName].enabled = *moduleStatus - vBuf[moduleName].updatedBy = string(ex.Name()) + vBuf[moduleName].updatedBy = string(e.ext.Name()) } } @@ -487,8 +515,8 @@ outerCycle: } // reset extenders' states if needed (mostly for enabled_script extender) - for _, ex := range s.extenders { - if re, ok := ex.(extenders.ResettableExtender); ok { + for _, e := range s.extenders { + if re, ok := e.ext.(extenders.ResettableExtender); ok { re.Reset() } } diff --git a/pkg/module_manager/scheduler/scheduler_test.go b/pkg/module_manager/scheduler/scheduler_test.go index 64b24ccd..dbb037cb 100644 --- a/pkg/module_manager/scheduler/scheduler_test.go +++ b/pkg/module_manager/scheduler/scheduler_test.go @@ -14,6 +14,7 @@ import ( "github.com/flant/addon-operator/pkg/kube_config_manager/config" "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/dynamically_enabled" "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/kube_config" + extender_mock "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/mock" "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/script_enabled" "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/static" "github.com/flant/addon-operator/pkg/module_manager/scheduler/node" @@ -107,6 +108,49 @@ nodeLocalDnsEnabled: false assert.NoError(t, err) } +func TestApplyExtenders(t *testing.T) { + values := ` +# CE Bundle "Default" +nodeLocalDnsEnabled: true +` + tmp, err := os.MkdirTemp(t.TempDir(), "ApplyExtenders") + require.NoError(t, err) + + s := NewScheduler(context.TODO()) + + valuesFile := filepath.Join(tmp, "values.yaml") + err = os.WriteFile(valuesFile, []byte(values), 0o644) + require.NoError(t, err) + + se, err := static.NewExtender(tmp) + assert.NoError(t, err) + + err = s.AddExtender(se) + assert.NoError(t, err) + + err = s.ApplyExtenders("A,Static") + assert.Equal(t, errors.New("couldn't find A extender in the list of available extenders"), err) + + err = s.ApplyExtenders("A,B,Static") + assert.Equal(t, errors.New("couldn't find A extender in the list of available extenders"), err) + + err = s.ApplyExtenders("A,B") + assert.Equal(t, errors.New("couldn't find A extender in the list of available extenders"), err) + + err = s.ApplyExtenders("A,Static,B") + assert.Equal(t, errors.New("couldn't find A extender in the list of available extenders"), err) + + err = s.ApplyExtenders("Static,B,A") + assert.Equal(t, errors.New("couldn't find B extender in the list of available extenders"), err) + + err = s.ApplyExtenders("Static") + assert.NoError(t, err) + + // finalize + err = os.RemoveAll(tmp) + assert.NoError(t, err) +} + func TestGetEnabledModuleNames(t *testing.T) { values := ` # CE Bundle "Default" @@ -290,6 +334,225 @@ func TestAddModuleVertex(t *testing.T) { assert.Equal(t, graph.ErrEdgeNotFound, err) } +func TestSetExtendersMeta(t *testing.T) { + s := NewScheduler(context.TODO()) + + err := s.AddExtender(&extender_mock.FilterOne{}) + assert.NoError(t, err) + err = s.AddExtender(&extender_mock.FilterTwo{}) + assert.NoError(t, err) + err = s.AddExtender(&extender_mock.FilterThree{}) + assert.NoError(t, err) + err = s.AddExtender(&extender_mock.TerminatorOne{}) + assert.NoError(t, err) + err = s.AddExtender(&extender_mock.TerminatorTwo{}) + assert.NoError(t, err) + err = s.ApplyExtenders("FilterOne,FilterTwo,FilterThree,TerminatorOne,TerminatorTwo") + assert.NoError(t, err) + + expected := []bool{ + true, + true, + false, + false, + false, + } + + for i, e := range s.extenders { + if expected[i] != e.filterAhead { + t.Errorf("extender's %s filterAhead value %v not equal to expected %v", e.ext.Name(), e.filterAhead, expected[i]) + } + } + + err = s.ApplyExtenders("TerminatorOne,TerminatorTwo,FilterOne,FilterTwo,FilterThree") + assert.NoError(t, err) + + expected = []bool{ + true, + true, + true, + true, + false, + } + + for i, e := range s.extenders { + if expected[i] != e.filterAhead { + t.Errorf("extender's %s filterAhead value %v not equal to expected %v", e.ext.Name(), e.filterAhead, expected[i]) + } + } + + err = s.AddExtender(&extender_mock.TerminatorThree{}) + assert.NoError(t, err) + err = s.ApplyExtenders("TerminatorOne,FilterOne,TerminatorTwo,FilterTwo,FilterThree,TerminatorThree") + assert.NoError(t, err) + + expected = []bool{ + true, + true, + true, + true, + false, + false, + } + + for i, e := range s.extenders { + if expected[i] != e.filterAhead { + t.Errorf("extender's %s filterAhead value %v not equal to expected %v", e.ext.Name(), e.filterAhead, expected[i]) + } + } +} + +func TestExtendersOrder(t *testing.T) { + values := ` +admissionPolicyEngineEnabled: true +kubeDnsEnabled: false +` + logLabels := map[string]string{"source": "TestExtendersOrder"} + basicModules := []*node_mock.MockModule{ + { + Name: "admission-policy-engine", + Order: 15, + EnabledScriptResult: true, + Path: "./testdata/015-admission-policy-engine/", + }, + { + Name: "kube-dns", + Order: 42, + EnabledScriptResult: false, + }, + { + Name: "ingress-nginx", + Order: 420, + EnabledScriptResult: true, + }, + } + s := NewScheduler(context.TODO()) + for _, m := range basicModules { + err := s.AddModuleVertex(m) + assert.NoError(t, err) + } + + tmp, err := os.MkdirTemp(t.TempDir(), "values-test") + require.NoError(t, err) + valuesFile := filepath.Join(tmp, "values.yaml") + err = os.WriteFile(valuesFile, []byte(values), 0o644) + require.NoError(t, err) + + se, err := static.NewExtender(tmp) + assert.NoError(t, err) + + err = s.AddExtender(se) + assert.NoError(t, err) + + require.NoError(t, err) + + scripte, err := script_enabled.NewExtender(tmp) + assert.NoError(t, err) + err = s.AddExtender(scripte) + assert.NoError(t, err) + + for _, v := range basicModules { + scripte.AddBasicModule(v) + } + + // terminator goes last + err = s.ApplyExtenders("Static,ScriptEnabled") + assert.NoError(t, err) + + updated, verticesToUpdate := s.RecalculateGraph(logLabels) + assert.Equal(t, true, updated) + + _, diff, err := s.GetGraphState(logLabels) + assert.NoError(t, err) + + expectedSummary := map[string]bool{ + "admission-policy-engine/Static": true, + "kube-dns/Static": false, + "ingress-nginx/": false, + } + + expectedDiff := map[string]bool{ + "admission-policy-engine": true, + } + + expectedVerticesToUpdate := []string{ + "admission-policy-engine", + "kube-dns", + } + + summary, err := s.PrintSummary() + assert.NoError(t, err) + + assert.Equal(t, expectedSummary, summary) + assert.Equal(t, expectedDiff, diff) + assert.Equal(t, expectedVerticesToUpdate, verticesToUpdate) + + // revert extenders order + err = s.ApplyExtenders("ScriptEnabled,Static") + assert.NoError(t, err) + + expectedSummary = map[string]bool{ + "admission-policy-engine/Static": true, + "kube-dns/Static": false, + "ingress-nginx/": false, + } + + expectedDiff = map[string]bool{} + + expectedVerticesToUpdate = []string{} + + updated, verticesToUpdate = s.RecalculateGraph(logLabels) + assert.Equal(t, false, updated) + + _, diff, err = s.GetGraphState(logLabels) + assert.NoError(t, err) + + summary, err = s.PrintSummary() + assert.NoError(t, err) + + assert.Equal(t, expectedSummary, summary) + assert.Equal(t, expectedDiff, diff) + assert.Equal(t, expectedVerticesToUpdate, verticesToUpdate) + + // update script_enabled extender so that the module is disabled + basicModules[0].EnabledScriptResult = false + + for _, v := range basicModules { + scripte.AddBasicModule(v) + } + + expectedSummary = map[string]bool{ + "admission-policy-engine/ScriptEnabled": false, + "kube-dns/Static": false, + "ingress-nginx/": false, + } + + expectedDiff = map[string]bool{ + "admission-policy-engine": false, + } + + expectedVerticesToUpdate = []string{ + "admission-policy-engine", + } + + updated, verticesToUpdate = s.RecalculateGraph(logLabels) + assert.Equal(t, true, updated) + + _, diff, err = s.GetGraphState(logLabels) + assert.NoError(t, err) + + summary, err = s.PrintSummary() + assert.NoError(t, err) + + assert.Equal(t, expectedSummary, summary) + assert.Equal(t, expectedDiff, diff) + assert.Equal(t, expectedVerticesToUpdate, verticesToUpdate) + + // finalize + err = os.RemoveAll(tmp) + assert.NoError(t, err) +} + func TestRecalculateGraph(t *testing.T) { values := ` # Default global values section @@ -323,7 +586,6 @@ fooBarEnabled: false flantIntegrationEnabled: true monitoringApplicationsEnabled: true l2LoadBalancerEnabled: false - ` logLabels := map[string]string{"source": "TestRecalculateGraph"} basicModules := []*node_mock.MockModule{ @@ -709,7 +971,7 @@ l2LoadBalancerEnabled: false assert.Equal(t, expected, summary) assert.Equal(t, expectedDiff, diff) assert.Equal(t, expectedVerticesToUpdate, verticesToUpdate) - assert.Equal(t, []string{"failed to execute 'ingress-nginx' module's enabled script: Exit code not 0"}, s.errList) + assert.Equal(t, []string{"ScriptEnabled extender failed to filter ingress-nginx module: failed to execute 'ingress-nginx' module's enabled script: Exit code not 0"}, s.errList) err = os.RemoveAll(tmp) assert.NoError(t, err) diff --git a/pkg/module_manager/scheduler/testdata/015-admission-policy-engine/enabled b/pkg/module_manager/scheduler/testdata/015-admission-policy-engine/enabled new file mode 100755 index 00000000..e69de29b diff --git a/pkg/module_manager/scheduler/testdata/042-kube-dns/enabled b/pkg/module_manager/scheduler/testdata/042-kube-dns/enabled new file mode 100755 index 00000000..e69de29b