Skip to content

Commit

Permalink
update scheduler logic regarding terminator extenders (#489)
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Scherba <mikhail.scherba@flant.com>
  • Loading branch information
miklezzzz authored Jul 12, 2024
1 parent 746f6f2 commit faa8675
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 58 deletions.
32 changes: 8 additions & 24 deletions pkg/module_manager/module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,39 +291,39 @@ func (mm *ModuleManager) Init() error {

gv, err := mm.loadGlobalValues()
if err != nil {
return err
return fmt.Errorf("couldn't load global values: %w", err)
}

staticExtender, err := static_extender.NewExtender(mm.ModulesDir)
if err != nil {
return err
return fmt.Errorf("couldn't create static extender: %w", err)
}
if err := mm.moduleScheduler.AddExtender(staticExtender); err != nil {
return err
return fmt.Errorf("couldn't add static extender: %w", err)
}

err = mm.registerGlobalModule(gv.globalValues, gv.configSchema, gv.valuesSchema)
if err != nil {
return err
return fmt.Errorf("couldn't register global module: %w", err)
}

kubeConfigExtender := kube_config_extender.NewExtender(mm.dependencies.KubeConfigManager)
if err := mm.moduleScheduler.AddExtender(kubeConfigExtender); err != nil {
return err
return fmt.Errorf("couldn't add kube config extender: %w", err)
}

scriptEnabledExtender, err := script_extender.NewExtender(mm.TempDir)
if err != nil {
return err
return fmt.Errorf("couldn't create script_enabled extender: %w", err)
}

if err := mm.moduleScheduler.AddExtender(scriptEnabledExtender); err != nil {
return err
return fmt.Errorf("couldn't add scrpt_enabled extender: %w", err)
}

// by this point we must have all required scheduler extenders attached
if err := mm.moduleScheduler.ApplyExtenders(app.AppliedExtenders); err != nil {
return err
return fmt.Errorf("couldn't apply extenders to the module scheduler: %w", err)
}

return mm.registerModules(scriptEnabledExtender)
Expand Down Expand Up @@ -645,22 +645,6 @@ func (mm *ModuleManager) RunModuleHook(moduleName, hookName string, binding Bind

func (mm *ModuleManager) HandleKubeEvent(kubeEvent KubeEvent, createGlobalTaskFn func(*hooks.GlobalHook, controller.BindingExecutionInfo), createModuleTaskFn func(*modules.BasicModule, *hooks.ModuleHook, controller.BindingExecutionInfo)) {
mm.LoopByBinding(OnKubernetesEvent, func(gh *hooks.GlobalHook, m *modules.BasicModule, mh *hooks.ModuleHook) {
defer func() {
if err := recover(); err != nil {
logEntry := log.WithField("function", "HandleKubeEvent").WithField("event", "OnKubernetesEvent")

if gh != nil {
logEntry.WithField("GlobalHook name", gh.GetName()).WithField("GlobakHook path", gh.GetPath())
}

if mh != nil {
logEntry.WithField("ModuleHook name", mh.GetName()).WithField("ModuleHook path", mh.GetPath())
}

logEntry.Errorf("panic occurred: %s", err)
}
}()

if gh != nil {
if gh.GetHookController().CanHandleKubeEvent(kubeEvent) {
gh.GetHookController().HandleKubeEvent(kubeEvent, func(info controller.BindingExecutionInfo) {
Expand Down
102 changes: 102 additions & 0 deletions pkg/module_manager/scheduler/extenders/mock/extenders_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// a bunch of mocked extenders for tests
package extenders_mock

import "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders"

type FilterOne struct{}

func (f *FilterOne) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterOne")
}

func (f *FilterOne) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *FilterOne) IsTerminator() bool {
return false
}

type FilterTwo struct{}

func (f *FilterTwo) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterTwo")
}

func (f *FilterTwo) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *FilterTwo) IsTerminator() bool {
return false
}

type FilterThree struct{}

func (f *FilterThree) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterThree")
}

func (f *FilterThree) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *FilterThree) IsTerminator() bool {
return false
}

type FilterFour struct{}

func (f *FilterFour) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterOne")
}

func (f *FilterFour) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *FilterFour) IsTerminator() bool {
return false
}

type TerminatorOne struct{}

func (f *TerminatorOne) Name() extenders.ExtenderName {
return extenders.ExtenderName("TerminatorOne")
}

func (f *TerminatorOne) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *TerminatorOne) IsTerminator() bool {
return true
}

type TerminatorTwo struct{}

func (f *TerminatorTwo) Name() extenders.ExtenderName {
return extenders.ExtenderName("TerminatorTwo")
}

func (f *TerminatorTwo) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *TerminatorTwo) IsTerminator() bool {
return true
}

type TerminatorThree struct{}

func (f *TerminatorThree) Name() extenders.ExtenderName {
return extenders.ExtenderName("TerminatorThree")
}

func (f *TerminatorThree) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *TerminatorThree) IsTerminator() bool {
return true
}
92 changes: 60 additions & 32 deletions pkg/module_manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ var defaultAppliedExtenders = []extenders.ExtenderName{
script_extender.Name,
}

type extenderContainer struct {
ext extenders.Extender
filterAhead bool
}

type Scheduler struct {
ctx context.Context

// list of extenders to cycle over on a run
extenders []extenders.Extender
extenders []extenderContainer
extCh chan extenders.ExtenderEvent
// graph visualization
graphImage image.Image
Expand Down Expand Up @@ -66,7 +71,7 @@ func NewScheduler(ctx context.Context) *Scheduler {
}
return &Scheduler{
ctx: ctx,
extenders: make([]extenders.Extender, 0),
extenders: make([]extenderContainer, 0),
extCh: make(chan extenders.ExtenderEvent, 1),
dag: graph.New(nodeHash, graph.Directed(), graph.Acyclic()),
diff: make(map[string]bool),
Expand Down Expand Up @@ -222,8 +227,8 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error {
appliedExtenders = defaultAppliedExtenders
} else {
availableExtenders := make(map[extenders.ExtenderName]bool, len(s.extenders))
for _, ext := range s.extenders {
availableExtenders[ext.Name()] = true
for _, e := range s.extenders {
availableExtenders[e.ext.Name()] = true
}

extendersFromEnv := strings.Split(extendersEnv, ",")
Expand All @@ -246,12 +251,12 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error {
}
}

newExtenders := []extenders.Extender{}
newExtenders := []extenderContainer{}
for _, appliedExt := range appliedExtenders {
for _, ext := range s.extenders {
if ext.Name() == appliedExt {
newExtenders = append(newExtenders, ext)
if ne, ok := ext.(extenders.NotificationExtender); ok {
for _, e := range s.extenders {
if e.ext.Name() == appliedExt {
newExtenders = append(newExtenders, e)
if ne, ok := e.ext.(extenders.NotificationExtender); ok {
ne.SetNotifyChannel(s.ctx, s.extCh)
}
break
Expand All @@ -261,24 +266,38 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error {

s.extenders = newExtenders

// set some extenders' meta
s.setExtendersMeta()

finalList := []extenders.ExtenderName{}
for _, ext := range s.extenders {
finalList = append(finalList, ext.Name())
for _, e := range s.extenders {
finalList = append(finalList, e.ext.Name())
}

log.Infof("The list of applied module extenders: [%s]", finalList)
log.Infof("The list of applied module extenders: %s", finalList)
return nil
}

// setExtendersMeta and some extra meta to the extenders that lets terminators know if there are any filtering extenders left in the list
func (s *Scheduler) setExtendersMeta() {
var filterAhead bool
for i := len(s.extenders) - 1; i >= 0; i-- {
s.extenders[i].filterAhead = filterAhead
if !filterAhead && !s.extenders[i].ext.IsTerminator() {
filterAhead = true
}
}
}

// AddExtender adds a new extender to the slice of the extenders that are used to determine modules' states
func (s *Scheduler) AddExtender(ext extenders.Extender) error {
for _, ex := range s.extenders {
if ex.Name() == ext.Name() {
for _, e := range s.extenders {
if e.ext.Name() == ext.Name() {
return fmt.Errorf("extender %s already added", ext.Name())
}
}

s.extenders = append(s.extenders, ext)
s.extenders = append(s.extenders, extenderContainer{ext: ext})
return nil
}

Expand Down Expand Up @@ -403,9 +422,9 @@ func (s *Scheduler) RecalculateGraph(logLabels map[string]string) (bool, []strin

// Filter returns filtering result for the specified extender and module
func (s *Scheduler) Filter(extName extenders.ExtenderName, moduleName string, logLabels map[string]string) (*bool, error) {
for _, ex := range s.extenders {
if ex.Name() == extName {
return ex.Filter(moduleName, logLabels)
for _, e := range s.extenders {
if e.ext.Name() == extName {
return e.ext.Filter(moduleName, logLabels)
}
}
return nil, fmt.Errorf("extender %s not found", extName)
Expand All @@ -424,7 +443,7 @@ func (s *Scheduler) recalculateGraphState(logLabels map[string]string) ( /* Grap

names, err := graph.StableTopologicalSort(s.dag, moduleSortFunc)
if err != nil {
errList = append(errList, err.Error())
errList = append(errList, fmt.Sprintf("couldn't perform stable topological sort: %s", err.Error()))
s.errList = errList
return true, updByDiff
}
Expand All @@ -444,31 +463,40 @@ outerCycle:
moduleName := vertex.GetName()
vBuf[moduleName] = &vertexState{}

for _, ex := range s.extenders {
// if current extender is a terminating one and by this point the module is already disabled - there's little sense in checking against a terminator
if ok := ex.IsTerminator(); ok && !vBuf[moduleName].enabled {
continue
for _, e := range s.extenders {
// if current extender is a terminating one and by this point the module is already disabled - there's little sense in checking against all other terminators
if e.ext.IsTerminator() && !vBuf[moduleName].enabled && !e.filterAhead {
break
}

moduleStatus, err := ex.Filter(moduleName, logLabels)
moduleStatus, err := e.ext.Filter(moduleName, logLabels)
if err != nil {
if permanent, ok := err.(*exerror.PermanentError); ok {
errList = append(errList, permanent.Error())
errList = append(errList, fmt.Sprintf("%s extender failed to filter %s module: %s", e.ext.Name(), moduleName, permanent.Error()))
break outerCycle
}
}

if moduleStatus != nil {
// if current extender is a terminating one and it says to disable - stop cycling over remaining extenders and disable the module
if ok := ex.IsTerminator(); ok {
if !*moduleStatus && vBuf[moduleName].enabled {
vBuf[moduleName].enabled = *moduleStatus
vBuf[moduleName].updatedBy = string(ex.Name())
if e.ext.IsTerminator() {
// if disabled - terminate filtering
if !*moduleStatus {
if vBuf[moduleName].enabled || e.filterAhead {
vBuf[moduleName].enabled = *moduleStatus
vBuf[moduleName].updatedBy = string(e.ext.Name())
}
break
}

// if enabled and there are some other filtering extenders ahead - continue filtering
if e.filterAhead {
continue
}
break
}
vBuf[moduleName].enabled = *moduleStatus
vBuf[moduleName].updatedBy = string(ex.Name())
vBuf[moduleName].updatedBy = string(e.ext.Name())
}
}

Expand All @@ -487,8 +515,8 @@ outerCycle:
}

// reset extenders' states if needed (mostly for enabled_script extender)
for _, ex := range s.extenders {
if re, ok := ex.(extenders.ResettableExtender); ok {
for _, e := range s.extenders {
if re, ok := e.ext.(extenders.ResettableExtender); ok {
re.Reset()
}
}
Expand Down
Loading

0 comments on commit faa8675

Please sign in to comment.