From d7c0f84912b5bca8615a59af4bbc8320c37d4900 Mon Sep 17 00:00:00 2001 From: Pavel Okhlopkov Date: Tue, 5 Nov 2024 18:22:50 +0300 Subject: [PATCH] refactor resource monitor Signed-off-by: Pavel Okhlopkov --- .../helm_resources_manager.go | 33 ++++++--- .../resources_monitor.go | 70 +++++++++---------- 2 files changed, 59 insertions(+), 44 deletions(-) diff --git a/pkg/helm_resources_manager/helm_resources_manager.go b/pkg/helm_resources_manager/helm_resources_manager.go index 0ca90b97..74ea75e4 100644 --- a/pkg/helm_resources_manager/helm_resources_manager.go +++ b/pkg/helm_resources_manager/helm_resources_manager.go @@ -106,12 +106,19 @@ func (hm *helmResourcesManager) StartMonitor(moduleName string, manifests []mani log.Debugf("Start helm resources monitor for '%s'", moduleName) hm.StopMonitor(moduleName) - rm := NewResourcesMonitor(hm.ctx, hm.kubeClient, hm.cache, hm.logger.Named("resource-monitor")) - rm.WithModuleName(moduleName) - rm.WithManifests(manifests) - rm.WithDefaultNamespace(defaultNamespace) - rm.WithStatusGetter(lastReleaseStatus) - rm.WithAbsentCb(hm.absentResourcesCallback) + cfg := &ResourceMonitorConfig{ + ModuleName: moduleName, + Manifests: manifests, + DefaultNamespace: defaultNamespace, + HelmStatusGetter: lastReleaseStatus, + AbsentCb: hm.absentResourcesCallback, + KubeClient: hm.kubeClient, + Cache: hm.cache, + + Logger: hm.logger.Named("resource-monitor"), + } + + rm := NewResourcesMonitor(hm.ctx, cfg) hm.monitors[moduleName] = rm rm.Start() @@ -183,8 +190,16 @@ func (hm *helmResourcesManager) GetMonitor(moduleName string) *ResourcesMonitor } func (hm *helmResourcesManager) GetAbsentResources(manifests []manifest.Manifest, defaultNamespace string) ([]manifest.Manifest, error) { - rm := NewResourcesMonitor(hm.ctx, hm.kubeClient, hm.cache, hm.logger.Named("resource-monitor")) - rm.WithManifests(manifests) - rm.WithDefaultNamespace(defaultNamespace) + cfg := &ResourceMonitorConfig{ + Manifests: manifests, + DefaultNamespace: defaultNamespace, + KubeClient: hm.kubeClient, + Cache: hm.cache, + + Logger: hm.logger.Named("resource-monitor"), + } + + rm := NewResourcesMonitor(hm.ctx, cfg) + return rm.AbsentResources() } diff --git a/pkg/helm_resources_manager/resources_monitor.go b/pkg/helm_resources_manager/resources_monitor.go index 03e58acf..b9a2d282 100644 --- a/pkg/helm_resources_manager/resources_monitor.go +++ b/pkg/helm_resources_manager/resources_monitor.go @@ -3,7 +3,6 @@ package helm_resources_manager import ( "context" "fmt" - "log/slog" "math/rand" "sort" "sync" @@ -15,13 +14,27 @@ import ( cr_cache "sigs.k8s.io/controller-runtime/pkg/cache" cr_client "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/flant/addon-operator/pkg/utils" klient "github.com/flant/kube-client/client" "github.com/flant/kube-client/manifest" ) const monitorDelayBase = time.Minute*4 + time.Second*30 +type ResourceMonitorConfig struct { + ModuleName string + Manifests []manifest.Manifest + DefaultNamespace string + + KubeClient *klient.Client + Cache cr_cache.Cache + + AbsentCb func(moduleName string, unexpectedStatus bool, absent []manifest.Manifest, defaultNs string) + + HelmStatusGetter func(releaseName string) (revision string, status string, err error) + + Logger *log.Logger +} + type ResourcesMonitor struct { ctx context.Context cancel context.CancelFunc @@ -41,48 +54,35 @@ type ResourcesMonitor struct { logger *log.Logger } -func NewResourcesMonitor(ctx context.Context, kclient *klient.Client, cache cr_cache.Cache, logger *log.Logger) *ResourcesMonitor { +func NewResourcesMonitor(ctx context.Context, cfg *ResourceMonitorConfig) *ResourcesMonitor { cctx, cancel := context.WithCancel(ctx) - return &ResourcesMonitor{ - paused: false, - manifests: make([]manifest.Manifest, 0), - ctx: cctx, - cancel: cancel, - kubeClient: kclient, - cache: cache, - logger: logger.With("operator.component", "HelmResourceMonitor"), - } -} -func (r *ResourcesMonitor) Stop() { - if r.cancel != nil { - r.cancel() + if len(cfg.Manifests) == 0 { + cfg.Manifests = make([]manifest.Manifest, 0) } -} - -func (r *ResourcesMonitor) WithLogLabels(logLabels map[string]string) { - r.logger = utils.EnrichLoggerWithLabels(r.logger, logLabels) -} -func (r *ResourcesMonitor) WithModuleName(name string) { - r.moduleName = name - r.logger = r.logger.With(slog.String("module", name)) -} + return &ResourcesMonitor{ + paused: false, + ctx: cctx, + cancel: cancel, -func (r *ResourcesMonitor) WithDefaultNamespace(ns string) { - r.defaultNamespace = ns -} + kubeClient: cfg.KubeClient, + cache: cfg.Cache, -func (r *ResourcesMonitor) WithManifests(manifests []manifest.Manifest) { - r.manifests = manifests -} + moduleName: cfg.ModuleName, + defaultNamespace: cfg.DefaultNamespace, + manifests: cfg.Manifests, + absentCb: cfg.AbsentCb, + helmStatusGetter: cfg.HelmStatusGetter, -func (r *ResourcesMonitor) WithAbsentCb(cb func(string, bool, []manifest.Manifest, string)) { - r.absentCb = cb + logger: cfg.Logger.With("operator.component", "HelmResourceMonitor"), + } } -func (r *ResourcesMonitor) WithStatusGetter(lastReleaseStatus func(releaseName string) (revision string, status string, err error)) { - r.helmStatusGetter = lastReleaseStatus +func (r *ResourcesMonitor) Stop() { + if r.cancel != nil { + r.cancel() + } } // Start creates a timer and check if all deployed manifests are present in the cluster.