Skip to content

Commit

Permalink
refactor resource monitor
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
  • Loading branch information
Pavel Okhlopkov committed Nov 5, 2024
1 parent 520722d commit d7c0f84
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 44 deletions.
33 changes: 24 additions & 9 deletions pkg/helm_resources_manager/helm_resources_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,19 @@ func (hm *helmResourcesManager) StartMonitor(moduleName string, manifests []mani
log.Debugf("Start helm resources monitor for '%s'", moduleName)
hm.StopMonitor(moduleName)

rm := NewResourcesMonitor(hm.ctx, hm.kubeClient, hm.cache, hm.logger.Named("resource-monitor"))
rm.WithModuleName(moduleName)
rm.WithManifests(manifests)
rm.WithDefaultNamespace(defaultNamespace)
rm.WithStatusGetter(lastReleaseStatus)
rm.WithAbsentCb(hm.absentResourcesCallback)
cfg := &ResourceMonitorConfig{
ModuleName: moduleName,
Manifests: manifests,
DefaultNamespace: defaultNamespace,
HelmStatusGetter: lastReleaseStatus,
AbsentCb: hm.absentResourcesCallback,
KubeClient: hm.kubeClient,
Cache: hm.cache,

Logger: hm.logger.Named("resource-monitor"),
}

rm := NewResourcesMonitor(hm.ctx, cfg)

hm.monitors[moduleName] = rm
rm.Start()
Expand Down Expand Up @@ -183,8 +190,16 @@ func (hm *helmResourcesManager) GetMonitor(moduleName string) *ResourcesMonitor
}

func (hm *helmResourcesManager) GetAbsentResources(manifests []manifest.Manifest, defaultNamespace string) ([]manifest.Manifest, error) {
rm := NewResourcesMonitor(hm.ctx, hm.kubeClient, hm.cache, hm.logger.Named("resource-monitor"))
rm.WithManifests(manifests)
rm.WithDefaultNamespace(defaultNamespace)
cfg := &ResourceMonitorConfig{
Manifests: manifests,
DefaultNamespace: defaultNamespace,
KubeClient: hm.kubeClient,
Cache: hm.cache,

Logger: hm.logger.Named("resource-monitor"),
}

rm := NewResourcesMonitor(hm.ctx, cfg)

return rm.AbsentResources()
}
70 changes: 35 additions & 35 deletions pkg/helm_resources_manager/resources_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package helm_resources_manager
import (
"context"
"fmt"
"log/slog"
"math/rand"
"sort"
"sync"
Expand All @@ -15,13 +14,27 @@ import (
cr_cache "sigs.k8s.io/controller-runtime/pkg/cache"
cr_client "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flant/addon-operator/pkg/utils"
klient "github.com/flant/kube-client/client"
"github.com/flant/kube-client/manifest"
)

const monitorDelayBase = time.Minute*4 + time.Second*30

type ResourceMonitorConfig struct {
ModuleName string
Manifests []manifest.Manifest
DefaultNamespace string

KubeClient *klient.Client
Cache cr_cache.Cache

AbsentCb func(moduleName string, unexpectedStatus bool, absent []manifest.Manifest, defaultNs string)

HelmStatusGetter func(releaseName string) (revision string, status string, err error)

Logger *log.Logger
}

type ResourcesMonitor struct {
ctx context.Context
cancel context.CancelFunc
Expand All @@ -41,48 +54,35 @@ type ResourcesMonitor struct {
logger *log.Logger
}

func NewResourcesMonitor(ctx context.Context, kclient *klient.Client, cache cr_cache.Cache, logger *log.Logger) *ResourcesMonitor {
func NewResourcesMonitor(ctx context.Context, cfg *ResourceMonitorConfig) *ResourcesMonitor {
cctx, cancel := context.WithCancel(ctx)
return &ResourcesMonitor{
paused: false,
manifests: make([]manifest.Manifest, 0),
ctx: cctx,
cancel: cancel,
kubeClient: kclient,
cache: cache,
logger: logger.With("operator.component", "HelmResourceMonitor"),
}
}

func (r *ResourcesMonitor) Stop() {
if r.cancel != nil {
r.cancel()
if len(cfg.Manifests) == 0 {
cfg.Manifests = make([]manifest.Manifest, 0)
}
}

func (r *ResourcesMonitor) WithLogLabels(logLabels map[string]string) {
r.logger = utils.EnrichLoggerWithLabels(r.logger, logLabels)
}

func (r *ResourcesMonitor) WithModuleName(name string) {
r.moduleName = name
r.logger = r.logger.With(slog.String("module", name))
}
return &ResourcesMonitor{
paused: false,
ctx: cctx,
cancel: cancel,

func (r *ResourcesMonitor) WithDefaultNamespace(ns string) {
r.defaultNamespace = ns
}
kubeClient: cfg.KubeClient,
cache: cfg.Cache,

func (r *ResourcesMonitor) WithManifests(manifests []manifest.Manifest) {
r.manifests = manifests
}
moduleName: cfg.ModuleName,
defaultNamespace: cfg.DefaultNamespace,
manifests: cfg.Manifests,
absentCb: cfg.AbsentCb,
helmStatusGetter: cfg.HelmStatusGetter,

func (r *ResourcesMonitor) WithAbsentCb(cb func(string, bool, []manifest.Manifest, string)) {
r.absentCb = cb
logger: cfg.Logger.With("operator.component", "HelmResourceMonitor"),
}
}

func (r *ResourcesMonitor) WithStatusGetter(lastReleaseStatus func(releaseName string) (revision string, status string, err error)) {
r.helmStatusGetter = lastReleaseStatus
func (r *ResourcesMonitor) Stop() {
if r.cancel != nil {
r.cancel()
}
}

// Start creates a timer and check if all deployed manifests are present in the cluster.
Expand Down

0 comments on commit d7c0f84

Please sign in to comment.