From 56cf3a11b6d3be59fdd734a71dac78e282e916d5 Mon Sep 17 00:00:00 2001 From: Abhishek Dasgupta Date: Thu, 11 Nov 2021 14:06:24 +0530 Subject: [PATCH] Addressed Tim and Shreyas's reviews of 10 Nov. --- controllers/config/compaction.go | 2 +- controllers/controller_ref_manager.go | 5 +- controllers/controllers_suite_test.go | 2 - controllers/etcd_controller.go | 107 +++--------- controllers/etcd_controller_test.go | 169 +++++++++++++++++++ controllers/lease_controller.go | 34 ++-- controllers/lease_controller_test.go | 225 ++++++-------------------- main.go | 6 +- pkg/predicate/predicate.go | 31 ++++ pkg/predicate/predicate_test.go | 63 ++++++++ 10 files changed, 358 insertions(+), 286 deletions(-) diff --git a/controllers/config/compaction.go b/controllers/config/compaction.go index dd7574340..091593e8e 100644 --- a/controllers/config/compaction.go +++ b/controllers/config/compaction.go @@ -20,6 +20,6 @@ import "time" type CompactionConfig struct { // ActiveDeadlineDuration is the duration after which a running compaction job will be killed (Ex: "300ms", "20s", "-1.5h" or "2h45m") ActiveDeadlineDuration time.Duration - // EventsThreshold is Total number of events that can be allowed before a compaction job is triggered + // EventsThreshold is total number of etcd events that can be allowed before a backup compaction job is triggered EventsThreshold int64 } diff --git a/controllers/controller_ref_manager.go b/controllers/controller_ref_manager.go index 6cb837dd5..f6b44be58 100644 --- a/controllers/controller_ref_manager.go +++ b/controllers/controller_ref_manager.go @@ -584,7 +584,6 @@ func getMapFromEtcd(im imagevector.ImageVector, etcd *druidv1alpha1.Etcd) (map[s var enableProfiling = false if etcd.Spec.Backup.EnableProfiling != nil { enableProfiling = *etcd.Spec.Backup.EnableProfiling - } backupValues := map[string]interface{}{ @@ -692,8 +691,8 @@ func getMapFromEtcd(im imagevector.ImageVector, etcd *druidv1alpha1.Etcd) (map[s "statefulsetReplicas": statefulsetReplicas, "serviceName": fmt.Sprintf("%s-client", etcd.Name), "configMapName": fmt.Sprintf("etcd-bootstrap-%s", string(etcd.UID[:6])), - "fullSnapLeaseName": getFullLease(etcd), - "deltaSnapLeaseName": getDeltaLease(etcd), + "fullSnapLeaseName": getFullSnapshotLeaseName(etcd), + "deltaSnapLeaseName": getDeltaSnapshotLeaseName(etcd), "jobName": getJobName(etcd), "volumeClaimTemplateName": volumeClaimTemplateName, "serviceAccountName": getServiceAccountName(etcd), diff --git a/controllers/controllers_suite_test.go b/controllers/controllers_suite_test.go index 11dc47e0c..c8a666d79 100644 --- a/controllers/controllers_suite_test.go +++ b/controllers/controllers_suite_test.go @@ -114,8 +114,6 @@ var _ = BeforeSuite(func(done Done) { }, }) - Expect(err).NotTo(HaveOccurred()) - err = custodian.SetupWithManager(mgrCtx, mgr, 1) Expect(err).NotTo(HaveOccurred()) diff --git a/controllers/etcd_controller.go b/controllers/etcd_controller.go index 57c61006b..1f1b3ca0b 100644 --- a/controllers/etcd_controller.go +++ b/controllers/etcd_controller.go @@ -878,7 +878,7 @@ func (r *EtcdReconciler) getStatefulSetFromEtcd(etcd *druidv1alpha1.Etcd, values func (r *EtcdReconciler) reconcileFullLease(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (*coordinationv1.Lease, error) { // Get or Create fullSnapshotRevisions lease object that will help to set BackupReady condition - fullSnapshotRevisions := getFullLease(etcd) + fullSnapshotRevisions := getFullSnapshotLeaseName(etcd) nsName := types.NamespacedName{ Name: fullSnapshotRevisions, Namespace: etcd.Namespace, @@ -892,27 +892,7 @@ func (r *EtcdReconciler) reconcileFullLease(ctx context.Context, logger logr.Log if apierrors.IsNotFound(err1) { logger.Info("Creating the full snap lease " + fullSnapshotRevisions) - renewTime := metav1.NewMicroTime(time.Now()) - fullLease = &coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Name: fullSnapshotRevisions, - Namespace: etcd.Namespace, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "druid.gardener.cloud/v1alpha1", - BlockOwnerDeletion: pointer.BoolPtr(true), - Controller: pointer.BoolPtr(true), - Kind: "Etcd", - Name: etcd.Name, - UID: etcd.UID, - }, - }, - }, - Spec: coordinationv1.LeaseSpec{ - HolderIdentity: pointer.StringPtr("0"), - RenewTime: &renewTime, - }, - } + fullLease = createSnapshotLease(etcd, fullSnapshotRevisions) err2 := r.Create(ctx, fullLease) if err2 != nil { logger.Error(err2, "Full snap lease "+fullSnapshotRevisions+" couldn't be created") @@ -922,36 +902,18 @@ func (r *EtcdReconciler) reconcileFullLease(ctx context.Context, logger logr.Log return nil, err1 } } - if !checkEtcdOwnerReference(fullLease.GetOwnerReferences(), etcd) { - err := kutil.TryPatch(ctx, retry.DefaultBackoff, r.Client, fullLease, func() error { - fullLease.OwnerReferences = []metav1.OwnerReference{ - { - APIVersion: "druid.gardener.cloud/v1alpha1", - BlockOwnerDeletion: pointer.BoolPtr(true), - Controller: pointer.BoolPtr(true), - Kind: "Etcd", - Name: etcd.Name, - UID: etcd.UID, - }, - } - return nil - }) - if err != nil { - logger.Error(err, "Full snap lease found but error occured during claiming it") - return nil, err - } - } + return fullLease, nil } -func getFullLease(etcd *druidv1alpha1.Etcd) string { +func getFullSnapshotLeaseName(etcd *druidv1alpha1.Etcd) string { return fmt.Sprintf("%s-full-snap", string(etcd.Name)) } func (r *EtcdReconciler) reconcileDeltaLease(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (*coordinationv1.Lease, error) { // Get or Create delta_snapshot_revisions lease object that will keep track of delta snapshot revisions based on which // compaction job will be scheduled - deltaSnapshotRevisions := getDeltaLease(etcd) + deltaSnapshotRevisions := getDeltaSnapshotLeaseName(etcd) nsName := types.NamespacedName{ Name: deltaSnapshotRevisions, Namespace: etcd.Namespace, @@ -965,27 +927,7 @@ func (r *EtcdReconciler) reconcileDeltaLease(ctx context.Context, logger logr.Lo if apierrors.IsNotFound(err1) { logger.Info("Creating the delta snap lease " + deltaSnapshotRevisions) - renewTime := metav1.NewMicroTime(time.Now()) - deltaLease = &coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Name: getDeltaLease(etcd), - Namespace: etcd.Namespace, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "druid.gardener.cloud/v1alpha1", - BlockOwnerDeletion: pointer.BoolPtr(true), - Controller: pointer.BoolPtr(true), - Kind: "Etcd", - Name: etcd.Name, - UID: etcd.UID, - }, - }, - }, - Spec: coordinationv1.LeaseSpec{ - HolderIdentity: pointer.StringPtr("0"), - RenewTime: &renewTime, - }, - } + deltaLease = createSnapshotLease(etcd, deltaSnapshotRevisions) err2 := r.Create(ctx, deltaLease) if err2 != nil { logger.Error(err2, "Delta snap lease "+deltaSnapshotRevisions+" couldn't be created") @@ -996,9 +938,20 @@ func (r *EtcdReconciler) reconcileDeltaLease(ctx context.Context, logger logr.Lo } } - if !checkEtcdOwnerReference(deltaLease.GetOwnerReferences(), etcd) { - err := kutil.TryPatch(ctx, retry.DefaultBackoff, r.Client, deltaLease, func() error { - deltaLease.OwnerReferences = []metav1.OwnerReference{ + return deltaLease, nil +} + +func getDeltaSnapshotLeaseName(etcd *druidv1alpha1.Etcd) string { + return fmt.Sprintf("%s-delta-snap", string(etcd.Name)) +} + +func createSnapshotLease(etcd *druidv1alpha1.Etcd, snapshotLeaseName string) *coordinationv1.Lease { + renewTime := metav1.NewMicroTime(time.Now()) + return &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: snapshotLeaseName, + Namespace: etcd.Namespace, + OwnerReferences: []metav1.OwnerReference{ { APIVersion: "druid.gardener.cloud/v1alpha1", BlockOwnerDeletion: pointer.BoolPtr(true), @@ -1007,22 +960,14 @@ func (r *EtcdReconciler) reconcileDeltaLease(ctx context.Context, logger logr.Lo Name: etcd.Name, UID: etcd.UID, }, - } - return nil - }) - if err != nil { - logger.Error(err, "Delta snap lease found but error occured during claiming it") - return nil, err - } + }, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: pointer.StringPtr("0"), + RenewTime: &renewTime, + }, } - - return deltaLease, nil } - -func getDeltaLease(etcd *druidv1alpha1.Etcd) string { - return fmt.Sprintf("%s-delta-snap", string(etcd.Name)) -} - func decodeObject(renderedChart *chartrenderer.RenderedChart, path string, object interface{}) error { if content, ok := renderedChart.Files()[path]; ok { decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(content)), 1024) diff --git a/controllers/etcd_controller_test.go b/controllers/etcd_controller_test.go index d7d143313..c03d2766d 100644 --- a/controllers/etcd_controller_test.go +++ b/controllers/etcd_controller_test.go @@ -22,7 +22,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "github.com/gardener/gardener/pkg/controllerutils" "github.com/gardener/gardener/pkg/utils/imagevector" + kutil "github.com/gardener/gardener/pkg/utils/kubernetes" "github.com/gardener/gardener/pkg/utils/test/matchers" "github.com/ghodss/yaml" @@ -36,6 +38,9 @@ import ( . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -43,6 +48,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -643,6 +649,121 @@ var _ = Describe("Druid", func() { ) }) +var _ = Describe("Cron Job", func() { + Context("when an existing cron job from older version is already present", func() { + var ( + err error + instance *druidv1alpha1.Etcd + c client.Client + cj *batchv1beta1.CronJob + ) + + It("should delete the existing cronjob if older than activeDeadlineDuration", func() { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + instance = getEtcd("foo80", "default", true) + c = mgr.GetClient() + ns := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: instance.Namespace, + }, + } + + _, err = controllerutil.CreateOrUpdate(context.TODO(), c, &ns, func() error { return nil }) + Expect(err).To(Not(HaveOccurred())) + + if instance.Spec.Backup.Store != nil && instance.Spec.Backup.Store.SecretRef != nil { + storeSecret := instance.Spec.Backup.Store.SecretRef.Name + errors := createSecrets(c, instance.Namespace, storeSecret) + Expect(len(errors)).Should(BeZero()) + } + err = c.Create(context.TODO(), instance) + Expect(err).NotTo(HaveOccurred()) + + // Create CronJob + cj = createCronJob(instance) + cj.Status.LastScheduleTime = &metav1.Time{Time: time.Now().Add(-3 * time.Hour)} + Expect(c.Create(ctx, cj)).To(Succeed()) + Eventually(func() error { return cronJobIsCorrectlyReconciled(c, instance, cj) }, timeout, pollingInterval).Should(BeNil()) + + // Deliberately update the delta lease + deltaLease := &coordinationv1.Lease{} + Eventually(func() error { return deltaLeaseIsCorrectlyReconciled(c, instance, deltaLease) }, timeout, pollingInterval).Should(BeNil()) + err = kutil.TryUpdate(context.TODO(), retry.DefaultBackoff, c, deltaLease, func() error { + deltaLease.Spec.HolderIdentity = pointer.StringPtr("1000000") + renewedTime := time.Now() + deltaLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime} + return nil + }) + Expect(err).NotTo(HaveOccurred()) + + // Wait until the cron job gets the "foregroundDeletion" finalizer and remove it + Eventually(func() (*batchv1beta1.CronJob, error) { + if err := c.Get(ctx, client.ObjectKeyFromObject(cj), cj); err != nil { + return nil, err + } + return cj, nil + }, timeout, pollingInterval).Should(PointTo(matchFinalizer(metav1.FinalizerDeleteDependents))) + Expect(controllerutils.PatchRemoveFinalizers(ctx, c, cj, metav1.FinalizerDeleteDependents)).To(Succeed()) + + // Wait until the cron job has been deleted + Eventually(func() error { + return c.Get(ctx, client.ObjectKeyFromObject(cj), &batchv1beta1.CronJob{}) + }, timeout, pollingInterval).Should(matchers.BeNotFoundError()) + }) + + It("should let the existing active cronjob run if not older than activeDeadlineDuration", func() { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + instance = getEtcd("foo81", "default", true) + c = mgr.GetClient() + ns := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: instance.Namespace, + }, + } + + _, err = controllerutil.CreateOrUpdate(context.TODO(), c, &ns, func() error { return nil }) + Expect(err).To(Not(HaveOccurred())) + + if instance.Spec.Backup.Store != nil && instance.Spec.Backup.Store.SecretRef != nil { + storeSecret := instance.Spec.Backup.Store.SecretRef.Name + errors := createSecrets(c, instance.Namespace, storeSecret) + Expect(len(errors)).Should(BeZero()) + } + err = c.Create(context.TODO(), instance) + Expect(err).NotTo(HaveOccurred()) + + // Create CronJob + cj = createCronJob(instance) + Expect(c.Create(ctx, cj)).To(Succeed()) + Eventually(func() error { return cronJobIsCorrectlyReconciled(c, instance, cj) }, timeout, pollingInterval).Should(BeNil()) + + // Deliberately update the delta lease + deltaLease := &coordinationv1.Lease{} + Eventually(func() error { return deltaLeaseIsCorrectlyReconciled(c, instance, deltaLease) }, timeout, pollingInterval).Should(BeNil()) + err = kutil.TryUpdate(context.TODO(), retry.DefaultBackoff, c, deltaLease, func() error { + deltaLease.Spec.HolderIdentity = pointer.StringPtr("1000000") + renewedTime := time.Now() + deltaLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime} + return nil + }) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() error { return cronJobIsCorrectlyReconciled(c, instance, cj) }, timeout, pollingInterval).Should(BeNil()) + }) + + AfterEach(func() { + Expect(c.Delete(context.TODO(), instance)).To(Succeed()) + Eventually(func() error { + return c.Get(context.TODO(), client.ObjectKeyFromObject(instance), &batchv1beta1.CronJob{}) + }, timeout, pollingInterval).Should(matchers.BeNotFoundError()) + }) + }) +}) + func validateRole(instance *druidv1alpha1.Etcd, role *rbac.Role) { Expect(*role).To(MatchFields(IgnoreExtras, Fields{ "ObjectMeta": MatchFields(IgnoreExtras, Fields{ @@ -2391,6 +2512,54 @@ func setStatefulSetReady(s *appsv1.StatefulSet) { s.Status.ReadyReplicas = replicas } +func cronJobIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.Etcd, cj *batchv1beta1.CronJob) error { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + req := types.NamespacedName{ + Name: getCronJobName(instance), + Namespace: instance.Namespace, + } + + if err := c.Get(ctx, req, cj); err != nil { + return err + } + return nil +} + +func createCronJob(instance *druidv1alpha1.Etcd) *batchv1beta1.CronJob { + cj := batchv1beta1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: getCronJobName(instance), + Namespace: instance.Namespace, + Labels: instance.Labels, + }, + Spec: batchv1beta1.CronJobSpec{ + Schedule: backupCompactionSchedule, + ConcurrencyPolicy: "Forbid", + JobTemplate: batchv1beta1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: instance.Labels, + }, + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "compact-backup", + Image: "eu.gcr.io/gardener-project/alpine:3.14", + Command: []string{"sh", "-c", "tail -f /dev/null"}, + }, + }, + }, + }, + }, + }, + }, + } + return &cj +} + var _ = Describe("buildPredicate", func() { var ( etcd *druidv1alpha1.Etcd diff --git a/controllers/lease_controller.go b/controllers/lease_controller.go index 937d64cf4..5fd38648d 100644 --- a/controllers/lease_controller.go +++ b/controllers/lease_controller.go @@ -26,7 +26,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" @@ -34,19 +33,21 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1" controllersconfig "github.com/gardener/etcd-druid/controllers/config" "github.com/gardener/etcd-druid/pkg/common" + druidpredicates "github.com/gardener/etcd-druid/pkg/predicate" "github.com/gardener/etcd-druid/pkg/utils" "github.com/gardener/gardener/pkg/utils/imagevector" kutil "github.com/gardener/gardener/pkg/utils/kubernetes" ) +const DefaultETCDQuota = 8 * 1024 * 1024 * 1024 // 8Gi // LeaseController reconciles compaction job type LeaseController struct { client.Client - Scheme *runtime.Scheme logger logr.Logger ImageVector imagevector.ImageVector config controllersconfig.CompactionConfig @@ -56,7 +57,6 @@ type LeaseController struct { func NewLeaseController(mgr manager.Manager, config controllersconfig.CompactionConfig) *LeaseController { return &LeaseController{ Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), logger: log.Log.WithName("lease-controller"), config: config, } @@ -108,7 +108,7 @@ func (lc *LeaseController) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Get delta snapshot lease to check the HolderIdentity value to take decision on compaction job nsName := types.NamespacedName{ - Name: getFullLease(etcd), + Name: getFullSnapshotLeaseName(etcd), Namespace: etcd.Namespace, } @@ -123,7 +123,7 @@ func (lc *LeaseController) Reconcile(ctx context.Context, req ctrl.Request) (ctr } nsName = types.NamespacedName{ - Name: getDeltaLease(etcd), + Name: getDeltaSnapshotLeaseName(etcd), Namespace: etcd.Namespace, } @@ -157,8 +157,7 @@ func (lc *LeaseController) Reconcile(ctx context.Context, req ctrl.Request) (ctr diff := delta - full - // Reconcile job only when current revision is more than 1 million (this vale is configurable through `events-threshold` option in druid) than - // the last snapshot (full/compact) revision + // Reconcile job only when number of accumulated revisions over the last full snapshot is more than the configured threshold value via 'events-threshold' flag if diff >= lc.config.EventsThreshold { return lc.reconcileJob(ctx, logger, etcd) } @@ -204,7 +203,7 @@ func (lc *LeaseController) reconcileJob(ctx context.Context, logger logr.Logger, } return ctrl.Result{ RequeueAfter: 10 * time.Second, - }, fmt.Errorf("existing job status failed") + }, nil } // Delete job and return if the job succeeded @@ -225,7 +224,7 @@ func (lc *LeaseController) delete(ctx context.Context, logger logr.Logger, etcd return ctrl.Result{Requeue: false}, nil } - if job.Name == getJobName(etcd) && job.DeletionTimestamp == nil { + if job.DeletionTimestamp == nil { logger.Info("Deleting job", "job", kutil.ObjectName(job)) if err := client.IgnoreNotFound(lc.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationForeground))); err != nil { return ctrl.Result{ @@ -495,10 +494,10 @@ func getCompactJobCommands(etcd *druidv1alpha1.Etcd) []string { command = append(command, "--data-dir=/var/etcd/data") command = append(command, "--snapstore-temp-directory=/var/etcd/data/tmp") command = append(command, "--enable-snapshot-lease-renewal=true") - command = append(command, "--full-snapshot-lease-name="+getFullLease(etcd)) - command = append(command, "--delta-snapshot-lease-name="+getDeltaLease(etcd)) + command = append(command, "--full-snapshot-lease-name="+getFullSnapshotLeaseName(etcd)) + command = append(command, "--delta-snapshot-lease-name="+getDeltaSnapshotLeaseName(etcd)) - var quota int64 = 8 * 1024 * 1024 * 1024 // 8Gi + var quota int64 = DefaultETCDQuota if etcd.Spec.Etcd.Quota != nil { quota = etcd.Spec.Etcd.Quota.Value() } @@ -537,8 +536,11 @@ func (lc *LeaseController) SetupWithManager(mgr ctrl.Manager, workers int) error MaxConcurrentReconciles: workers, }) - return builder. - For(&druidv1alpha1.Etcd{}). - Owns(&coordinationv1.Lease{}). - Complete(lc) + builder = builder.WithEventFilter(buildPredicateForLC()).For(&druidv1alpha1.Etcd{}) + builder = builder.Owns(&coordinationv1.Lease{}) + return builder.Complete(lc) +} + +func buildPredicateForLC() predicate.Predicate { + return druidpredicates.LeaseHolderIdentityChange() } diff --git a/controllers/lease_controller_test.go b/controllers/lease_controller_test.go index 1c8df9fae..f4062d005 100644 --- a/controllers/lease_controller_test.go +++ b/controllers/lease_controller_test.go @@ -29,7 +29,6 @@ import ( . "github.com/onsi/gomega/gstruct" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" - batchv1beta1 "k8s.io/api/batch/v1beta1" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,12 +41,14 @@ import ( var _ = Describe("Lease Controller", func() { Context("when fields are not set in etcd.Spec", func() { - var err error - var instance *druidv1alpha1.Etcd - var c client.Client - var s *appsv1.StatefulSet - var cm *corev1.ConfigMap - var svc *corev1.Service + var ( + err error + instance *druidv1alpha1.Etcd + c client.Client + s *appsv1.StatefulSet + cm *corev1.ConfigMap + svc *corev1.Service + ) BeforeEach(func() { instance = getEtcdWithDefault("foo333", "default") c = mgr.GetClient() @@ -104,10 +105,12 @@ var _ = Describe("Lease Controller", func() { func(name string, generateEtcd func(string, string) *druidv1alpha1.Etcd, validateETCDCmpctJob func(*druidv1alpha1.Etcd, *batchv1.Job)) { - var err error - var instance *druidv1alpha1.Etcd - var c client.Client - var j *batchv1.Job + var ( + err error + instance *druidv1alpha1.Etcd + c client.Client + j *batchv1.Job + ) instance = generateEtcd(name, "default") c = mgr.GetClient() @@ -168,10 +171,12 @@ var _ = Describe("Lease Controller", func() { ) Context("when an existing job is already present", func() { - var err error - var instance *druidv1alpha1.Etcd - var c client.Client - var ns corev1.Namespace + var ( + err error + instance *druidv1alpha1.Etcd + c client.Client + ns corev1.Namespace + ) BeforeEach(func() { @@ -224,7 +229,22 @@ var _ = Describe("Lease Controller", func() { }) Expect(err).NotTo(HaveOccurred()) + // Wait until the job gets the "foregroundDeletion" finalizer and remove it + Eventually(func() (*batchv1.Job, error) { + if err := c.Get(ctx, client.ObjectKeyFromObject(j), j); err != nil { + return nil, err + } + return j, nil + }, timeout, pollingInterval).Should(PointTo(matchFinalizer(metav1.FinalizerDeleteDependents))) + Expect(controllerutils.PatchRemoveFinalizers(ctx, c, j, metav1.FinalizerDeleteDependents)).To(Succeed()) + + // Wait until the job has been deleted + Eventually(func() error { + return c.Get(ctx, client.ObjectKeyFromObject(j), &batchv1.Job{}) + }, timeout, pollingInterval).Should(matchers.BeNotFoundError()) + //Instead of failed one a new job should be created + j = &batchv1.Job{} Eventually(func() error { return jobIsCorrectlyReconciled(c, instance, j) }, timeout, pollingInterval).Should(BeNil()) }) @@ -346,119 +366,6 @@ var _ = Describe("Lease Controller", func() { Eventually(func() error { return etcdRemoved(c, instance) }, timeout, pollingInterval).Should(BeNil()) }) }) - Context("when an existing cron job from older version is already present", func() { - var err error - var instance *druidv1alpha1.Etcd - var c client.Client - var cj *batchv1beta1.CronJob - BeforeEach(func() { - - }) - - It("should delete the existing cronjob if older than activeDeadlineDuration", func() { - ctx, cancel := context.WithTimeout(context.TODO(), timeout) - defer cancel() - - instance = getEtcd("foo80", "default", true) - c = mgr.GetClient() - ns := corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: instance.Namespace, - }, - } - - _, err = controllerutil.CreateOrUpdate(context.TODO(), c, &ns, func() error { return nil }) - Expect(err).To(Not(HaveOccurred())) - - if instance.Spec.Backup.Store != nil && instance.Spec.Backup.Store.SecretRef != nil { - storeSecret := instance.Spec.Backup.Store.SecretRef.Name - errors := createSecrets(c, instance.Namespace, storeSecret) - Expect(len(errors)).Should(BeZero()) - } - err = c.Create(context.TODO(), instance) - Expect(err).NotTo(HaveOccurred()) - - // Create CronJob - cj = createCronJob(instance) - cj.Status.LastScheduleTime = &metav1.Time{Time: time.Now().Add(-3 * time.Hour)} - Expect(c.Create(ctx, cj)).To(Succeed()) - Eventually(func() error { return cronJobIsCorrectlyReconciled(c, instance, cj) }, timeout, pollingInterval).Should(BeNil()) - - // Deliberately update the delta lease - deltaLease := &coordinationv1.Lease{} - Eventually(func() error { return deltaLeaseIsCorrectlyReconciled(c, instance, deltaLease) }, timeout, pollingInterval).Should(BeNil()) - err = kutil.TryUpdate(context.TODO(), retry.DefaultBackoff, c, deltaLease, func() error { - deltaLease.Spec.HolderIdentity = pointer.StringPtr("1000000") - renewedTime := time.Now() - deltaLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime} - return nil - }) - Expect(err).NotTo(HaveOccurred()) - - // Wait until the cron job gets the "foregroundDeletion" finalizer and remove it - Eventually(func() (*batchv1beta1.CronJob, error) { - if err := c.Get(ctx, client.ObjectKeyFromObject(cj), cj); err != nil { - return nil, err - } - return cj, nil - }, timeout, pollingInterval).Should(PointTo(matchFinalizer(metav1.FinalizerDeleteDependents))) - Expect(controllerutils.PatchRemoveFinalizers(ctx, c, cj, metav1.FinalizerDeleteDependents)).To(Succeed()) - - // Wait until the cron job has been deleted - Eventually(func() error { - return c.Get(ctx, client.ObjectKeyFromObject(cj), &batchv1beta1.CronJob{}) - }, timeout, pollingInterval).Should(matchers.BeNotFoundError()) - }) - - It("should let the existing active cronjob run if not older than activeDeadlineDuration", func() { - ctx, cancel := context.WithTimeout(context.TODO(), timeout) - defer cancel() - - instance = getEtcd("foo81", "default", true) - c = mgr.GetClient() - ns := corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: instance.Namespace, - }, - } - - _, err = controllerutil.CreateOrUpdate(context.TODO(), c, &ns, func() error { return nil }) - Expect(err).To(Not(HaveOccurred())) - - if instance.Spec.Backup.Store != nil && instance.Spec.Backup.Store.SecretRef != nil { - storeSecret := instance.Spec.Backup.Store.SecretRef.Name - errors := createSecrets(c, instance.Namespace, storeSecret) - Expect(len(errors)).Should(BeZero()) - } - err = c.Create(context.TODO(), instance) - Expect(err).NotTo(HaveOccurred()) - - // Create CronJob - cj = createCronJob(instance) - Expect(c.Create(ctx, cj)).To(Succeed()) - Eventually(func() error { return cronJobIsCorrectlyReconciled(c, instance, cj) }, timeout, pollingInterval).Should(BeNil()) - - // Deliberately update the delta lease - deltaLease := &coordinationv1.Lease{} - Eventually(func() error { return deltaLeaseIsCorrectlyReconciled(c, instance, deltaLease) }, timeout, pollingInterval).Should(BeNil()) - err = kutil.TryUpdate(context.TODO(), retry.DefaultBackoff, c, deltaLease, func() error { - deltaLease.Spec.HolderIdentity = pointer.StringPtr("1000000") - renewedTime := time.Now() - deltaLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime} - return nil - }) - Expect(err).NotTo(HaveOccurred()) - - Eventually(func() error { return cronJobIsCorrectlyReconciled(c, instance, cj) }, timeout, pollingInterval).Should(BeNil()) - }) - - AfterEach(func() { - Expect(c.Delete(context.TODO(), instance)).To(Succeed()) - Eventually(func() error { - return c.Get(context.TODO(), client.ObjectKeyFromObject(instance), &batchv1beta1.CronJob{}) - }, timeout, pollingInterval).Should(matchers.BeNotFoundError()) - }) - }) }) func validateEtcdForCmpctJob(instance *druidv1alpha1.Etcd, j *batchv1.Job) { @@ -491,8 +398,8 @@ func validateEtcdForCmpctJob(instance *druidv1alpha1.Etcd, j *batchv1.Job) { "--data-dir=/var/etcd/data": Equal("--data-dir=/var/etcd/data"), "--snapstore-temp-directory=/var/etcd/data/tmp": Equal("--snapstore-temp-directory=/var/etcd/data/tmp"), "--enable-snapshot-lease-renewal=true": Equal("--enable-snapshot-lease-renewal=true"), - fmt.Sprintf("%s=%s", "--full-snapshot-lease-name", getFullLease(instance)): Equal(fmt.Sprintf("%s=%s", "--full-snapshot-lease-name", getFullLease(instance))), - fmt.Sprintf("%s=%s", "--delta-snapshot-lease-name", getDeltaLease(instance)): Equal(fmt.Sprintf("%s=%s", "--delta-snapshot-lease-name", getDeltaLease(instance))), + fmt.Sprintf("%s=%s", "--full-snapshot-lease-name", getFullSnapshotLeaseName(instance)): Equal(fmt.Sprintf("%s=%s", "--full-snapshot-lease-name", getFullSnapshotLeaseName(instance))), + fmt.Sprintf("%s=%s", "--delta-snapshot-lease-name", getDeltaSnapshotLeaseName(instance)): Equal(fmt.Sprintf("%s=%s", "--delta-snapshot-lease-name", getDeltaSnapshotLeaseName(instance))), fmt.Sprintf("%s=%s", "--store-prefix", instance.Spec.Backup.Store.Prefix): Equal(fmt.Sprintf("%s=%s", "--store-prefix", instance.Spec.Backup.Store.Prefix)), fmt.Sprintf("%s=%s", "--storage-provider", store): Equal(fmt.Sprintf("%s=%s", "--storage-provider", store)), fmt.Sprintf("%s=%s", "--store-container", *instance.Spec.Backup.Store.Container): Equal(fmt.Sprintf("%s=%s", "--store-container", *instance.Spec.Backup.Store.Container)), @@ -878,6 +785,7 @@ func validateStoreAlicloudForCmpctJob(instance *druidv1alpha1.Etcd, j *batchv1.J func jobIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.Etcd, job *batchv1.Job) error { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() + req := types.NamespacedName{ Name: getJobName(instance), Namespace: instance.Namespace, @@ -886,6 +794,11 @@ func jobIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.Etcd, job if err := c.Get(ctx, req, job); err != nil { return err } + + if job.Status.Failed > 0 { + return fmt.Errorf("Job is running but it's failed") + } + return nil } @@ -919,59 +832,11 @@ func createJob(instance *druidv1alpha1.Etcd) *batchv1.Job { return &j } -func cronJobIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.Etcd, cj *batchv1beta1.CronJob) error { - ctx, cancel := context.WithTimeout(context.TODO(), timeout) - defer cancel() - req := types.NamespacedName{ - Name: getCronJobName(instance), - Namespace: instance.Namespace, - } - - if err := c.Get(ctx, req, cj); err != nil { - return err - } - return nil -} - -func createCronJob(instance *druidv1alpha1.Etcd) *batchv1beta1.CronJob { - cj := batchv1beta1.CronJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: getCronJobName(instance), - Namespace: instance.Namespace, - Labels: instance.Labels, - }, - Spec: batchv1beta1.CronJobSpec{ - Schedule: backupCompactionSchedule, - ConcurrencyPolicy: "Forbid", - JobTemplate: batchv1beta1.JobTemplateSpec{ - Spec: batchv1.JobSpec{ - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: instance.Labels, - }, - Spec: corev1.PodSpec{ - RestartPolicy: "Never", - Containers: []corev1.Container{ - { - Name: "compact-backup", - Image: "eu.gcr.io/gardener-project/alpine:3.14", - Command: []string{"sh", "-c", "tail -f /dev/null"}, - }, - }, - }, - }, - }, - }, - }, - } - return &cj -} - func fullLeaseIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.Etcd, lease *coordinationv1.Lease) error { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() req := types.NamespacedName{ - Name: getFullLease(instance), + Name: getFullSnapshotLeaseName(instance), Namespace: instance.Namespace, } @@ -989,7 +854,7 @@ func deltaLeaseIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.Et ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() req := types.NamespacedName{ - Name: getDeltaLease(instance), + Name: getDeltaSnapshotLeaseName(instance), Namespace: instance.Namespace, } diff --git a/main.go b/main.go index 6996ea045..72570b1d7 100644 --- a/main.go +++ b/main.go @@ -75,9 +75,9 @@ func main() { flag.IntVar(&custodianWorkers, "custodian-workers", 3, "Number of worker threads of the custodian controller.") flag.IntVar(&etcdCopyBackupsTaskWorkers, "etcd-copy-backups-task-workers", 3, "Number of worker threads of the EtcdCopyBackupsTask controller.") flag.DurationVar(&custodianSyncPeriod, "custodian-sync-period", 30*time.Second, "Sync period of the custodian controller.") - flag.IntVar(&compactionWorkers, "enable-compaction-job", 1, "Number of worker threads of the CompactionJob controller.") - flag.Int64Var(&eventsThreshold, "events-threshold", 1000000, "Total number of events that can be allowed before a compaction job is triggered") - flag.DurationVar(&activeDeadlineDuration, "active-deadline-duration", 3*time.Hour, "Duration after which a running compaction job will be killed (Ex: \"300ms\", \"20s\", \"-1.5h\" or \"2h45m\")") + flag.IntVar(&compactionWorkers, "compaction-Workers", 3, "Number of worker threads of the CompactionJob controller. CompactionJob controller runs backup compaction job in parallel. CompactionJob controller can be disabled if this flag is set to 0.") + flag.Int64Var(&eventsThreshold, "etcd-events-threshold", 1000000, "Total number of etcd events that can be allowed before a backup compaction job is triggered.") + flag.DurationVar(&activeDeadlineDuration, "active-deadline-duration", 3*time.Hour, "Duration after which a running backup compaction job will be killed (Ex: \"300ms\", \"20s\", \"-1.5h\" or \"2h45m\").") flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go index e076454a7..b4de0ec4d 100644 --- a/pkg/predicate/predicate.go +++ b/pkg/predicate/predicate.go @@ -19,6 +19,7 @@ import ( v1beta1constants "github.com/gardener/gardener/pkg/apis/core/v1beta1/constants" appsv1 "k8s.io/api/apps/v1" + coordinationv1 "k8s.io/api/coordination/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -111,3 +112,33 @@ func StatefulSetStatusChange() predicate.Predicate { }, } } + +// LeaseHolderIdentityChange is a predicate for holderIdentity changes of `Lease` resources. +func LeaseHolderIdentityChange() predicate.Predicate { + holderIdentityChange := func(objOld, objNew client.Object) bool { + leaseOld, ok := objOld.(*coordinationv1.Lease) + if !ok { + return false + } + leaseNew, ok := objNew.(*coordinationv1.Lease) + if !ok { + return false + } + return *leaseOld.Spec.HolderIdentity != *leaseNew.Spec.HolderIdentity + } + + return predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { + return true + }, + UpdateFunc: func(event event.UpdateEvent) bool { + return holderIdentityChange(event.ObjectOld, event.ObjectNew) + }, + GenericFunc: func(event event.GenericEvent) bool { + return true + }, + DeleteFunc: func(event event.DeleteEvent) bool { + return true + }, + } +} diff --git a/pkg/predicate/predicate_test.go b/pkg/predicate/predicate_test.go index 483391bae..5a6925def 100644 --- a/pkg/predicate/predicate_test.go +++ b/pkg/predicate/predicate_test.go @@ -15,11 +15,14 @@ package predicate_test import ( + "time" + druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1" v1beta1constants "github.com/gardener/gardener/pkg/apis/core/v1beta1/constants" . "github.com/onsi/ginkgo" "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" + coordinationv1 "k8s.io/api/coordination/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" @@ -108,6 +111,66 @@ var _ = Describe("Druid Predicate", func() { }) }) + Describe("#Lease", func() { + var pred predicate.Predicate + + JustBeforeEach(func() { + pred = LeaseHolderIdentityChange() + }) + + Context("when holder identity matches", func() { + BeforeEach(func() { + renewTime := metav1.NewMicroTime(time.Now()) + obj = &coordinationv1.Lease{ + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: pointer.StringPtr("0"), + RenewTime: &renewTime, + }, + } + renewTime = metav1.NewMicroTime(time.Now()) + oldObj = &coordinationv1.Lease{ + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: pointer.StringPtr("0"), + RenewTime: &renewTime, + }, + } + }) + + It("should return false", func() { + gomega.Expect(pred.Create(createEvent)).To(gomega.BeTrue()) + gomega.Expect(pred.Update(updateEvent)).To(gomega.BeFalse()) + gomega.Expect(pred.Delete(deleteEvent)).To(gomega.BeTrue()) + gomega.Expect(pred.Generic(genericEvent)).To(gomega.BeTrue()) + }) + }) + + Context("when holder identity differs", func() { + BeforeEach(func() { + renewTime := metav1.NewMicroTime(time.Now()) + obj = &coordinationv1.Lease{ + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: pointer.StringPtr("5"), + RenewTime: &renewTime, + }, + } + renewTime = metav1.NewMicroTime(time.Now()) + oldObj = &coordinationv1.Lease{ + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: pointer.StringPtr("0"), + RenewTime: &renewTime, + }, + } + }) + + It("should return true", func() { + gomega.Expect(pred.Create(createEvent)).To(gomega.BeTrue()) + gomega.Expect(pred.Update(updateEvent)).To(gomega.BeTrue()) + gomega.Expect(pred.Delete(deleteEvent)).To(gomega.BeTrue()) + gomega.Expect(pred.Generic(genericEvent)).To(gomega.BeTrue()) + }) + }) + }) + Describe("#LastOperationNotSuccessful", func() { var pred predicate.Predicate