From f6d9b7ef9842f9b9ee86019e7e33391deeee9e76 Mon Sep 17 00:00:00 2001 From: Abhishek Dasgupta Date: Mon, 25 Jul 2022 13:15:26 +0530 Subject: [PATCH] Added logics for quorum loss scenario. --- config/rbac/role.yaml | 116 ++--------- controllers/cluster_mgmt_controller.go | 215 ++++++++++++++++++++ controllers/cluster_mgmt_controller_test.go | 188 +++++++++++++++++ controllers/config/cluster_mgmt.go | 24 +++ controllers/controllers_suite_test.go | 21 +- controllers/etcd_controller.go | 3 +- controllers/etcd_controller_test.go | 44 ++++ main.go | 16 ++ pkg/component/etcd/lease/lease_member.go | 2 +- pkg/component/etcd/lease/lease_test.go | 4 +- pkg/predicate/predicate.go | 57 ++++++ 11 files changed, 582 insertions(+), 108 deletions(-) create mode 100644 controllers/cluster_mgmt_controller.go create mode 100644 controllers/cluster_mgmt_controller_test.go create mode 100644 controllers/config/cluster_mgmt.go diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 4fb453b04..425eb3e38 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -7,151 +7,69 @@ metadata: name: manager-role rules: - apiGroups: - - "" + - batch resources: - - pods + - jobs verbs: - - list - - watch + - create - delete -- apiGroups: - - "" - resources: - - secrets - - endpoints - verbs: - get - list - patch - update - watch - apiGroups: - - "" + - coordination.k8s.io resources: - - events + - leases verbs: - create + - delete + - deletecollection - get - list - - watch - patch - update -- apiGroups: - - "" - resources: - - serviceaccounts - verbs: - - get - - list - watch - - create - - update - - patch - - delete - apiGroups: - - rbac.authorization.k8s.io + - druid.gardener.cloud resources: - - roles - - rolebindings + - etcds verbs: - - get - - list - - watch - create - - update - - patch - delete -- apiGroups: - - "" - - apps - resources: - - services - - configmaps - - statefulsets - verbs: - get - list - patch - update - watch - - create - - delete -- apiGroups: - - batch - resources: - - jobs - verbs: - - get - - list - - watch - - create - - update - - patch - - delete -- apiGroups: - - batch - resources: - - cronjobs - verbs: - - get - - list - - watch - - delete -- apiGroups: - - druid.gardener.cloud - resources: - - etcds - - etcdcopybackupstasks - verbs: - - get - - list - - watch - - create - - update - - patch - - delete - apiGroups: - druid.gardener.cloud resources: - etcds/status - - etcds/finalizers - - etcdcopybackupstasks/status - - etcdcopybackupstasks/finalizers verbs: - get - - update - patch - - create -- apiGroups: - - coordination.k8s.io - resources: - - leases - verbs: - - get - - list - - watch - - create - update - - patch - - delete - - deletecollection - apiGroups: - - "" + - druid.gardener.cloud resources: - - persistentvolumeclaims + - secrets verbs: - get - list + - patch + - update - watch - apiGroups: - policy resources: - poddisruptionbudgets verbs: + - create + - delete - get - list - - watch - - create - - update - patch - - delete + - update + - watch diff --git a/controllers/cluster_mgmt_controller.go b/controllers/cluster_mgmt_controller.go new file mode 100644 index 000000000..0b313d06e --- /dev/null +++ b/controllers/cluster_mgmt_controller.go @@ -0,0 +1,215 @@ +// Copyright (c) 2022 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/go-logr/logr" + appsv1 "k8s.io/api/apps/v1" + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + + druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1" + controllersconfig "github.com/gardener/etcd-druid/controllers/config" + "github.com/gardener/etcd-druid/pkg/health/etcdmember" + druidpredicates "github.com/gardener/etcd-druid/pkg/predicate" + "github.com/gardener/gardener/pkg/controllerutils" + kutil "github.com/gardener/gardener/pkg/utils/kubernetes" +) + +const clusterMgmtControllerName = "cluster-mgmt-controller" + +// ClusterMgmtController reconciles ETCD multinode cluster +type ClusterMgmtController struct { + client.Client + logger logr.Logger + config controllersconfig.ClusterMgmtConfig +} + +// NewClusterMgmtController creates a new ClusterMgmtController object +func NewClusterMgmtController(mgr manager.Manager, config controllersconfig.ClusterMgmtConfig) *ClusterMgmtController { + return &ClusterMgmtController{ + Client: mgr.GetClient(), + logger: log.Log.WithName("cluster-mgmt-controller"), + config: config, + } +} + +// SetupWithManager sets up manager with a new controller and cmc as the reconcile.Reconciler +func (cmc *ClusterMgmtController) SetupWithManager(mgr ctrl.Manager, workers int) error { + + ctrl, err := controller.New(clusterMgmtControllerName, mgr, controller.Options{ + Reconciler: cmc, + MaxConcurrentReconciles: workers, + }) + if err != nil { + return err + } + + return ctrl.Watch( + &source.Kind{Type: &coordinationv1.Lease{}}, + &handler.EnqueueRequestForOwner{OwnerType: &druidv1alpha1.Etcd{}, IsController: true}, + // druidpredicates.LeaseHolderIdentityChange(), + druidpredicates.IsMemberLease(), + ) +} + +// +kubebuilder:rbac:groups=druid.gardener.cloud,resources=etcds,verbs=get;list;watch +// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;create;update;patch;delete + +// Reconcile reconciles the multinode ETCD cluster. +func (cmc *ClusterMgmtController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + cmc.logger.Info("Cluster management controller reconciliation started") + etcd := &druidv1alpha1.Etcd{} + if err := cmc.Get(ctx, req.NamespacedName, etcd); err != nil { + if errors.IsNotFound(err) { + // Object not found, return. Created objects are automatically garbage collected. + // For additional cleanup logic use finalizers. + return ctrl.Result{}, nil + } + // Error reading the object - requeue the request. + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, err + } + + logger := cmc.logger.WithValues("etcd", kutil.Key(etcd.Namespace, etcd.Name).String()) + + if etcd == nil { + return ctrl.Result{Requeue: false}, fmt.Errorf("ETCD object is not found") + } + + if !etcd.DeletionTimestamp.IsZero() { + return ctrl.Result{Requeue: false}, nil + } + + if etcd.Spec.Replicas <= 1 { + return ctrl.Result{Requeue: false}, nil + } + + // Allow some time before the quorum loss check actually happens + startTime := time.Now() + if !startTime.After(etcd.CreationTimestamp.Add(cmc.config.WaitDuration)) { + return ctrl.Result{RequeueAfter: 2 * time.Minute}, nil + } + + time.Sleep(2 * time.Minute) + unknownThreshold := 300 * time.Second + notReadyThreshold := 60 * time.Second + + checker := etcdmember.ReadyCheck(cmc.Client, logger, controllersconfig.EtcdCustodianController{ + EtcdMember: controllersconfig.EtcdMemberConfig{ + EtcdMemberNotReadyThreshold: notReadyThreshold, + EtcdMemberUnknownThreshold: unknownThreshold, + }, + }) + + results := checker.Check(context.Background(), *etcd) + totalReadyMembers := 0 + + for _, result := range results { + if result.Status() == "LeaseSucceeded" { + totalReadyMembers = totalReadyMembers + 1 + } + } + + quorum := int(etcd.Spec.Replicas)/2 + 1 + + if totalReadyMembers < quorum { + // scale down the statefulset to 0 + sts := &appsv1.StatefulSet{} + err := cmc.Get(ctx, types.NamespacedName{Name: etcd.Name, Namespace: etcd.Namespace}, sts) + if err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not fetch statefulset: %v", err) + } + + logger.Info("Scaling down the statefulset to 0") + if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error { + sts.Spec.Replicas = pointer.Int32(0) + return nil + }); err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not scale down statefulset to 0 : %v", err) + } + + sts = &appsv1.StatefulSet{} + err = cmc.Get(ctx, types.NamespacedName{Name: etcd.Name, Namespace: etcd.Namespace}, sts) + if err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not fetch statefulset: %v", err) + } + fmt.Printf("The statefulset %v has replicas %v", sts.Name, *sts.Spec.Replicas) + + logger.Info("Deleting PVCs") + // delete the pvcs + if err := cmc.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, + client.InNamespace(sts.GetNamespace()), + client.MatchingLabels(getMatchingLabels(sts))); client.IgnoreNotFound(err) != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not delete pvcs : %v", err) + } + + logger.Info("Scaling up the statefulset to 1") + // scale up the statefulset to 1 + if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error { + sts.Spec.Replicas = pointer.Int32(1) + return nil + }); err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not scale up statefulset to 1 : %v", err) + } + + // scale up the statefulset to ETCD replicas + logger.Info("Scaling up the statefulset to the number of replicas mentioned in ETCD spec") + if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error { + sts.Spec.Replicas = &etcd.Spec.Replicas + return nil + }); err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not scale up statefulset to replica number : %v", err) + } + } + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil +} + +func getMatchingLabels(sts *appsv1.StatefulSet) map[string]string { + labels := make(map[string]string) + + labels["name"] = sts.Labels["name"] + labels["instance"] = sts.Labels["instance"] + + return labels +} diff --git a/controllers/cluster_mgmt_controller_test.go b/controllers/cluster_mgmt_controller_test.go new file mode 100644 index 000000000..0d80535aa --- /dev/null +++ b/controllers/cluster_mgmt_controller_test.go @@ -0,0 +1,188 @@ +// Copyright (c) 2022 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package controllers + +import ( + "context" + "fmt" + "time" + + druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1" + "github.com/gardener/gardener/pkg/controllerutils" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +var ( + unknownThreshold = 300 * time.Second + notReadyThreshold = 60 * time.Second + expire = time.Minute * 3 +) + +var _ = FDescribe("Cluster Management Controller", func() { + FContext("when quorum is lost for multinode ETCD cluster", func() { + var ( + err error + instance *druidv1alpha1.Etcd + c client.Client + s *appsv1.StatefulSet + cm *corev1.ConfigMap + svc *corev1.Service + now time.Time + longExpirationTime = metav1.NewMicroTime(now.Add(-1 * unknownThreshold).Add(-1 * time.Second).Add(-1 * notReadyThreshold)) + ) + BeforeEach(func() { + instance = getMultinodeEtcdDefault("foo444", "default") + c = mgr.GetClient() + ns := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: instance.Namespace, + }, + } + + _, err = controllerutil.CreateOrUpdate(context.TODO(), c, &ns, func() error { return nil }) + Expect(err).To(Not(HaveOccurred())) + + err = c.Create(context.TODO(), instance) + Expect(err).NotTo(HaveOccurred()) + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsCorrectlyReconciled(c, instance, s) }, timeout, pollingInterval).Should(BeNil()) + cm = &corev1.ConfigMap{} + Eventually(func() error { return configMapIsCorrectlyReconciled(c, instance, cm) }, timeout, pollingInterval).Should(BeNil()) + svc = &corev1.Service{} + Eventually(func() error { return clientServiceIsCorrectlyReconciled(c, instance, svc) }, timeout, pollingInterval).Should(BeNil()) + }) + FIt("when renew time of member leases expired", func() { + // Sleep until wait duration for the controller is over + time.Sleep(waitDuration) + // Deliberately update the first member lease + memberLease := &coordinationv1.Lease{} + Eventually(func() error { return fetchMemberLease(c, instance, memberLease, 1) }, timeout, pollingInterval).Should(BeNil()) + err = controllerutils.TryUpdate(context.TODO(), retry.DefaultBackoff, c, memberLease, func() error { + memberLease.Spec.RenewTime = &longExpirationTime + return nil + }) + Expect(err).To(Not(HaveOccurred())) + + // Deliberately update the second member lease + memberLease = &coordinationv1.Lease{} + Eventually(func() error { return fetchMemberLease(c, instance, memberLease, 2) }, timeout, pollingInterval).Should(BeNil()) + err = controllerutils.TryUpdate(context.TODO(), retry.DefaultBackoff, c, memberLease, func() error { + memberLease.Spec.RenewTime = &longExpirationTime + return nil + }) + Expect(err).To(Not(HaveOccurred())) + + // Check if statefulset replicas is scaled down to 0 + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsScaled(c, instance, s, 0) }, timeout, pollingInterval).Should(BeNil()) + + // Check if statefulset replicas is scaled up to 1 + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsScaled(c, instance, s, 1) }, timeout, pollingInterval).Should(BeNil()) + + // Check if statefulset replicas is scaled up to etcd replicas + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsScaled(c, instance, s, instance.Spec.Replicas) }, timeout, pollingInterval).Should(BeNil()) + + }) + + AfterEach(func() { + Expect(c.Delete(context.TODO(), instance)).To(Succeed()) + Eventually(func() error { return statefulSetRemoved(c, s) }, timeout, pollingInterval).Should(BeNil()) + Eventually(func() error { return etcdRemoved(c, instance) }, timeout, pollingInterval).Should(BeNil()) + }) + }) +}) + +func getMultinodeEtcdDefault(name, namespace string) *druidv1alpha1.Etcd { + instance := &druidv1alpha1.Etcd{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: druidv1alpha1.EtcdSpec{ + Annotations: map[string]string{ + "app": "etcd-statefulset", + "role": "test", + "instance": name, + }, + Labels: map[string]string{ + "name": "etcd", + "instance": name, + }, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "name": "etcd", + "instance": name, + }, + }, + Replicas: 3, + Backup: druidv1alpha1.BackupSpec{}, + Etcd: druidv1alpha1.EtcdConfig{}, + Common: druidv1alpha1.SharedConfig{}, + }, + } + return instance +} + +func fetchMemberLease(c client.Client, instance *druidv1alpha1.Etcd, lease *coordinationv1.Lease, replica int) error { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + req := types.NamespacedName{ + Name: memberLeaseName(instance.Name, replica), + Namespace: instance.Namespace, + } + + if err := c.Get(ctx, req, lease); err != nil { + return err + } + + if !checkEtcdOwnerReference(lease.GetOwnerReferences(), instance) { + return fmt.Errorf("ownerReference does not exists for lease") + } + return nil +} + +func memberLeaseName(etcdName string, replica int) string { + return fmt.Sprintf("%s-%d-member", etcdName, replica) +} + +func statefulsetIsScaled(c client.Client, instance *druidv1alpha1.Etcd, ss *appsv1.StatefulSet, replicas int32) error { + ctx, cancel := context.WithTimeout(context.TODO(), expire) + defer cancel() + req := types.NamespacedName{ + Name: instance.Name, + Namespace: instance.Namespace, + } + + if err := c.Get(ctx, req, ss); err != nil { + return err + } + + stsReplicas := *ss.Spec.Replicas + if stsReplicas != replicas { + return fmt.Errorf("statefulset replicas are yet %d instead of %d", stsReplicas, replicas) + } + + return nil +} diff --git a/controllers/config/cluster_mgmt.go b/controllers/config/cluster_mgmt.go new file mode 100644 index 000000000..7c799a0b3 --- /dev/null +++ b/controllers/config/cluster_mgmt.go @@ -0,0 +1,24 @@ +// Copyright (c) 2022 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import "time" + +// ClusterMgmtConfig contains configuration for the cluster management controller. +type ClusterMgmtConfig struct { + // WaitDuration is the duration after which cluster management controller will start checking ETCD member health to determine + // if quorum loss happened (Ex: "300ms", "20s", "-1.5h" or "2h45m"). + WaitDuration time.Duration +} diff --git a/controllers/controllers_suite_test.go b/controllers/controllers_suite_test.go index 666725bea..7f6beb3ab 100644 --- a/controllers/controllers_suite_test.go +++ b/controllers/controllers_suite_test.go @@ -51,6 +51,7 @@ var ( mgrStopped *sync.WaitGroup activeDeadlineDuration time.Duration + waitDuration time.Duration backupCompactionSchedule = "15 */24 * * *" revertFns []func() @@ -105,12 +106,12 @@ var _ = BeforeSuite(func(done Done) { er, err := NewEtcdReconcilerWithImageVector(mgr, false) Expect(err).NotTo(HaveOccurred()) - err = er.SetupWithManager(mgr, 1, true) + err = er.SetupWithManager(mgr, 5, true) Expect(err).NotTo(HaveOccurred()) secret := NewSecret(mgr) - err = secret.SetupWithManager(mgr, 1) + err = secret.SetupWithManager(mgr, 5) Expect(err).NotTo(HaveOccurred()) custodian := NewEtcdCustodian(mgr, controllersconfig.EtcdCustodianController{ @@ -119,13 +120,13 @@ var _ = BeforeSuite(func(done Done) { }, }) - err = custodian.SetupWithManager(mgrCtx, mgr, 1, true) + err = custodian.SetupWithManager(mgrCtx, mgr, 5, true) Expect(err).NotTo(HaveOccurred()) etcdCopyBackupsTaskReconciler, err := NewEtcdCopyBackupsTaskReconcilerWithImageVector(mgr) Expect(err).NotTo(HaveOccurred()) - err = etcdCopyBackupsTaskReconciler.SetupWithManager(mgr, 1) + err = etcdCopyBackupsTaskReconciler.SetupWithManager(mgr, 5) Expect(err).NotTo(HaveOccurred()) activeDeadlineDuration, err = time.ParseDuration("2m") @@ -138,7 +139,17 @@ var _ = BeforeSuite(func(done Done) { }) Expect(err).NotTo(HaveOccurred()) - err = lc.SetupWithManager(mgr, 1) + err = lc.SetupWithManager(mgr, 5) + Expect(err).NotTo(HaveOccurred()) + + waitDuration, err = time.ParseDuration("3m") + Expect(err).NotTo(HaveOccurred()) + cmc := NewClusterMgmtController(mgr, controllersconfig.ClusterMgmtConfig{ + WaitDuration: waitDuration, + }) + Expect(err).NotTo(HaveOccurred()) + + err = cmc.SetupWithManager(mgr, 5) Expect(err).NotTo(HaveOccurred()) mgrStopped = startTestManager(mgrCtx, mgr) diff --git a/controllers/etcd_controller.go b/controllers/etcd_controller.go index 515af17cc..83ab69acd 100644 --- a/controllers/etcd_controller.go +++ b/controllers/etcd_controller.go @@ -196,7 +196,7 @@ func (r *EtcdReconciler) SetupWithManager(mgr ctrl.Manager, workers int, ignoreO builder := ctrl.NewControllerManagedBy(mgr).WithOptions(controller.Options{ MaxConcurrentReconciles: workers, }) - builder = builder.WithEventFilter(buildPredicate(ignoreOperationAnnotation)).For(&druidv1alpha1.Etcd{}) + builder = builder.WithEventFilter(buildPredicate(ignoreOperationAnnotation)).WithEventFilter(druidpredicates.StatefulSetStatusChange()).For(&druidv1alpha1.Etcd{}) if ignoreOperationAnnotation { builder = builder.Owns(&corev1.Service{}). Owns(&corev1.ConfigMap{}). @@ -692,6 +692,7 @@ func (r *EtcdReconciler) reconcileEtcd(ctx context.Context, logger logr.Logger, return nil, nil, err } + logger.Info("Creating statefulset") stsDeployer := componentsts.New(r.Client, etcd.Namespace, val.StatefulSet) err = stsDeployer.Deploy(ctx) diff --git a/controllers/etcd_controller_test.go b/controllers/etcd_controller_test.go index a7eef0fda..fc28670e8 100644 --- a/controllers/etcd_controller_test.go +++ b/controllers/etcd_controller_test.go @@ -460,9 +460,18 @@ var _ = Describe("Druid", func() { rb = &rbac.RoleBinding{} Eventually(func() error { return roleBindingIsCorrectlyReconciled(c, instance, rb) }, timeout, pollingInterval).Should(BeNil()) + Eventually(func() error { return setReniewTimeForMemberLeases(c, instance) }, timeout, pollingInterval).Should(BeNil()) + validate(instance, s, cm, clSvc, prSvc) validateRole(instance, role) + req := types.NamespacedName{ + Name: instance.Name, + Namespace: instance.Namespace, + } + + err = c.Get(context.TODO(), req, s) + Expect(err).NotTo(HaveOccurred()) setStatefulSetReady(s) err = c.Status().Update(context.TODO(), s) Expect(err).NotTo(HaveOccurred()) @@ -708,7 +717,11 @@ var _ = Describe("Multinode ETCD", func() { svc = &corev1.Service{} Eventually(func() error { return clientServiceIsCorrectlyReconciled(c, instance, svc) }, timeout, pollingInterval).Should(BeNil()) + fmt.Println("Came here") + Eventually(func() error { return setReniewTimeForMemberLeases(c, instance) }, timeout, pollingInterval).Should(BeNil()) + // Validate statefulset + Expect(sts.Spec.Replicas).ShouldNot(BeNil()) Expect(*sts.Spec.Replicas).To(Equal(int32(instance.Spec.Replicas))) if instance.Spec.Replicas == 1 { @@ -720,6 +733,7 @@ var _ = Describe("Multinode ETCD", func() { matcher := "initial-cluster: foo84-0=http://foo84-0.foo84-peer.default.svc:2380,foo84-1=http://foo84-1.foo84-peer.default.svc:2380,foo84-2=http://foo84-2.foo84-peer.default.svc:2380" Expect(strings.Contains(cm.Data["etcd.conf.yaml"], matcher)).To(BeTrue()) } + fmt.Println("Came here2") }, Entry("verify configmap mount path and etcd.conf.yaml when replica is 1 ", "foo83", 1, getEtcdWithReplicas), Entry("verify configmap mount path and etcd.conf.yaml when replica is 3 ", "foo84", 3, getEtcdWithReplicas), @@ -1943,6 +1957,36 @@ func statefulsetIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.E return nil } +func setReniewTimeForMemberLeases(c client.Client, instance *druidv1alpha1.Etcd) error { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + for i := 0; i < int(instance.Spec.Replicas); i++ { + leaseName := memberLeaseName(instance.Name, i) + + req := types.NamespacedName{ + Name: leaseName, + Namespace: instance.Namespace, + } + + ls := &coordinationv1.Lease{} + if err := c.Get(ctx, req, ls); err != nil { + return err + } + + setTime := metav1.NewMicroTime(time.Now()) + if err := controllerutils.TryUpdate(context.TODO(), retry.DefaultBackoff, c, ls, func() error { + ls.Spec.RenewTime = &setTime + return nil + }); err != nil { + return err + } + + } + + return nil +} + func configMapIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.Etcd, cm *corev1.ConfigMap) error { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() diff --git a/main.go b/main.go index 4936bfc90..f4d9799b4 100644 --- a/main.go +++ b/main.go @@ -46,11 +46,13 @@ func main() { custodianWorkers int secretWorkers int etcdCopyBackupsTaskWorkers int + clusterMgmtWorkers int custodianSyncPeriod time.Duration disableLeaseCache bool compactionWorkers int eventsThreshold int64 activeDeadlineDuration time.Duration + waitDuration time.Duration ignoreOperationAnnotation bool disableEtcdServiceAccountAutomount bool @@ -71,6 +73,7 @@ func main() { flag.IntVar(&compactionWorkers, "compaction-workers", 3, "Number of worker threads of the CompactionJob controller. The controller creates a backup compaction job if a certain etcd event threshold is reached. Setting this flag to 0 disabled the controller.") flag.Int64Var(&eventsThreshold, "etcd-events-threshold", 1000000, "Total number of etcd events that can be allowed before a backup compaction job is triggered.") flag.DurationVar(&activeDeadlineDuration, "active-deadline-duration", 3*time.Hour, "Duration after which a running backup compaction job will be killed (Ex: \"300ms\", \"20s\", \"-1.5h\" or \"2h45m\").") + flag.DurationVar(&waitDuration, "wait-duration", 15*time.Minute, "Wait for a minimum duration after which cluster management controller starts checking whether the quorum is lost or not. Recommended to allow atleast 15 minute. (Ex: \"300ms\", \"20s\", \"-1.5h\" or \"2h45m\")") flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") @@ -83,6 +86,7 @@ func main() { flag.DurationVar(&etcdMemberNotReadyThreshold, "etcd-member-notready-threshold", 5*time.Minute, "Threshold after which an etcd member is considered not ready if the status was unknown before.") flag.BoolVar(&disableEtcdServiceAccountAutomount, "disable-etcd-serviceaccount-automount", false, "If true then .automountServiceAccountToken will be set to false for the ServiceAccount created for etcd statefulsets.") flag.DurationVar(&etcdMemberUnknownThreshold, "etcd-member-unknown-threshold", 1*time.Minute, "Threshold after which an etcd member is considered unknown.") + flag.IntVar(&clusterMgmtWorkers, "cluster-mgmt-workers", 3, "Number of worker threads of the Cluster Management controller.") flag.Parse() @@ -166,6 +170,18 @@ func main() { os.Exit(1) } + cmc := controllers.NewClusterMgmtController(mgr, controllersconfig.ClusterMgmtConfig{ + WaitDuration: waitDuration, + }) + if err := cmc.SetupWithManager(mgr, clusterMgmtWorkers); err != nil { + setupLog.Error(err, "Unable to create controller", "Controller", "Cluster Management") + os.Exit(1) + } + + if err := etcdCopyBackupsTask.SetupWithManager(mgr, etcdCopyBackupsTaskWorkers); err != nil { + setupLog.Error(err, "Unable to create controller", "Controller", "EtcdCopyBackupsTask") + } + // +kubebuilder:scaffold:builder setupLog.Info("Starting manager") diff --git a/pkg/component/etcd/lease/lease_member.go b/pkg/component/etcd/lease/lease_member.go index ce7689846..b334ee840 100644 --- a/pkg/component/etcd/lease/lease_member.go +++ b/pkg/component/etcd/lease/lease_member.go @@ -98,5 +98,5 @@ func getMemberLeaseLabels(val Values) map[string]string { } func memberLeaseName(etcdName string, replica int) string { - return fmt.Sprintf("%s-%d", etcdName, replica) + return fmt.Sprintf("%s-%d-member", etcdName, replica) } diff --git a/pkg/component/etcd/lease/lease_test.go b/pkg/component/etcd/lease/lease_test.go index c81238891..caf233872 100644 --- a/pkg/component/etcd/lease/lease_test.go +++ b/pkg/component/etcd/lease/lease_test.go @@ -248,7 +248,7 @@ func checkMemberLeases(ctx context.Context, c client.Client, etcd *druidv1alpha1 func memberLeases(name string, etcdUID types.UID, replicas int32) []interface{} { var elements []interface{} for i := 0; i < int(replicas); i++ { - elements = append(elements, matchLeaseElement(fmt.Sprintf("%s-%d", name, i), name, etcdUID)) + elements = append(elements, matchLeaseElement(fmt.Sprintf("%s-%d-member", name, i), name, etcdUID)) } return elements @@ -273,7 +273,7 @@ func matchLeaseElement(leaseName, etcdName string, etcdUID types.UID) gomegatype func memberLease(etcd *druidv1alpha1.Etcd, replica int, withOwnerRef bool) coordinationv1.Lease { lease := coordinationv1.Lease{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%d", etcd.Name, replica), + Name: fmt.Sprintf("%s-%d-member", etcd.Name, replica), Namespace: etcd.Namespace, Labels: map[string]string{ common.GardenerOwnedBy: etcd.Name, diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go index 145d22723..3e1ad6bd8 100644 --- a/pkg/predicate/predicate.go +++ b/pkg/predicate/predicate.go @@ -82,6 +82,36 @@ func LastOperationNotSuccessful() predicate.Predicate { } } +// StatefulsetReplicasChange is a predicate for changes in replicas of statefulset +func StatefulsetReplicasChange() predicate.Predicate { + replicasChange := func(objOld, objNew client.Object) bool { + stsOld, ok := objOld.(*appsv1.StatefulSet) + if !ok { + return false + } + stsNew, ok := objNew.(*appsv1.StatefulSet) + if !ok { + return false + } + return stsOld.Spec.Replicas == stsNew.Spec.Replicas + + } + return predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { + return true + }, + UpdateFunc: func(event event.UpdateEvent) bool { + return replicasChange(event.ObjectOld, event.ObjectNew) + }, + GenericFunc: func(event event.GenericEvent) bool { + return true + }, + DeleteFunc: func(event event.DeleteEvent) bool { + return true + }, + } +} + // StatefulSetStatusChange is a predicate for status changes of `StatefulSet` resources. func StatefulSetStatusChange() predicate.Predicate { statusChange := func(objOld, objNew client.Object) bool { @@ -216,3 +246,30 @@ func IsSnapshotLease() predicate.Predicate { }, } } + +// IsMemberLease is a predicate that is `true` if the passed lease object is a member lease. +func IsMemberLease() predicate.Predicate { + isMemberLease := func(obj client.Object) bool { + lease, ok := obj.(*coordinationv1.Lease) + if !ok { + return false + } + + return strings.HasSuffix(lease.Name, "member") + } + + return predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { + return isMemberLease(event.Object) + }, + UpdateFunc: func(event event.UpdateEvent) bool { + return isMemberLease(event.ObjectNew) + }, + GenericFunc: func(event event.GenericEvent) bool { + return isMemberLease(event.Object) + }, + DeleteFunc: func(event event.DeleteEvent) bool { + return isMemberLease(event.Object) + }, + } +}