Skip to content

Commit

Permalink
Added logics for quorum loss scenario.
Browse files Browse the repository at this point in the history
  • Loading branch information
abdasgupta committed Jul 25, 2022
1 parent 51b3cfe commit 988bb5b
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 2 deletions.
192 changes: 192 additions & 0 deletions controllers/cluster_mgmt_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright (c) 2022 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllers

import (
"context"
"fmt"
"time"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"

druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1"
controllersconfig "github.com/gardener/etcd-druid/controllers/config"
"github.com/gardener/etcd-druid/pkg/health/etcdmember"
druidpredicates "github.com/gardener/etcd-druid/pkg/predicate"
"github.com/gardener/gardener/pkg/controllerutils"
"github.com/gardener/gardener/pkg/utils/imagevector"
kutil "github.com/gardener/gardener/pkg/utils/kubernetes"
)

const clusterMgmtControllerName = "cluster-mgmt-controller"

// ClusterMgmtController reconciles ETCD multinode cluster
type ClusterMgmtController struct {
client.Client
logger logr.Logger
ImageVector imagevector.ImageVector
}

// NewClusterMgmtController creates a new ClusterMgmtController object
func NewClusterMgmtController(mgr manager.Manager) *ClusterMgmtController {
return &ClusterMgmtController{
Client: mgr.GetClient(),
logger: log.Log.WithName("cluster-mgmt-controller"),
}
}

// SetupWithManager sets up manager with a new controller and cmc as the reconcile.Reconciler
func (cmc *ClusterMgmtController) SetupWithManager(mgr ctrl.Manager, workers int) error {

ctrl, err := controller.New(clusterMgmtControllerName, mgr, controller.Options{
Reconciler: cmc,
MaxConcurrentReconciles: workers,
})
if err != nil {
return err
}

return ctrl.Watch(
&source.Kind{Type: &coordinationv1.Lease{}},
&handler.EnqueueRequestForOwner{OwnerType: &druidv1alpha1.Etcd{}, IsController: true},
// druidpredicates.LeaseHolderIdentityChange(),
druidpredicates.IsMemberLease(),
)
}

// +kubebuilder:rbac:groups=druid.gardener.cloud,resources=etcds,verbs=get;list;watch
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;create;update;patch;delete

// Reconcile reconciles the multinode ETCD cluster.
func (cmc *ClusterMgmtController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
cmc.logger.Info("Cluster management controller reconciliation started")
etcd := &druidv1alpha1.Etcd{}
if err := cmc.Get(ctx, req.NamespacedName, etcd); err != nil {
if errors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, err
}

logger := cmc.logger.WithValues("etcd", kutil.Key(etcd.Namespace, etcd.Name).String())

// run a loop every 5 minutes that will monitor the cluster health and take action if members in the etcd cluster are down
for {
if !etcd.DeletionTimestamp.IsZero() {
return ctrl.Result{Requeue: false}, nil
}

unknownThreshold := 300 * time.Second
notReadyThreshold := 60 * time.Second

checker := etcdmember.ReadyCheck(cmc.Client, logger, controllersconfig.EtcdCustodianController{
EtcdMember: controllersconfig.EtcdMemberConfig{
EtcdMemberNotReadyThreshold: notReadyThreshold,
EtcdMemberUnknownThreshold: unknownThreshold,
},
})

results := checker.Check(context.Background(), *etcd)

totalReadyMembers := 0

for _, result := range results {
if result.Status() == "LeaseSucceeded" {
totalReadyMembers = totalReadyMembers + 1
}
}

quorum := int(etcd.Spec.Replicas)/2 + 1

if totalReadyMembers < quorum {
// scale down the statefulset to 0
sts := &appsv1.StatefulSet{}
err := cmc.Get(ctx, types.NamespacedName{Name: etcd.Name, Namespace: etcd.Namespace}, sts)
if err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not fetch statefulset: %v", err)
}

if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error {
sts.Spec.Replicas = pointer.Int32(0)
return nil
}); err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not scale down statefulset to 0 : %v", err)
}

// delete the pvcs
if err := cmc.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{},
client.InNamespace(sts.GetNamespace()),
client.MatchingLabels(getMatchingLabels(sts))); err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not delete pvcs : %v", err)
}

// scale up the statefulset to 1
if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error {
sts.Spec.Replicas = pointer.Int32(1)
return nil
}); err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not scale up statefulset to 1 : %v", err)
}

// scale up the statefulset to ETCD replicas
if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error {
sts.Spec.Replicas = &etcd.Spec.Replicas
return nil
}); err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not scale up statefulset to replica number : %v", err)
}

continue
}
}
}

func getMatchingLabels(sts *appsv1.StatefulSet) map[string]string {
labels := make(map[string]string)

labels["name"] = sts.Labels["name"]
labels["instance"] = sts.Labels["instance"]

return labels
}
183 changes: 183 additions & 0 deletions controllers/cluster_mgmt_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright (c) 2022 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controllers

import (
"context"
"fmt"
"time"

druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1"
"github.com/gardener/gardener/pkg/controllerutils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

var (
unknownThreshold = 300 * time.Second
notReadyThreshold = 60 * time.Second
)

var _ = FDescribe("Cluster Management Controller", func() {
FContext("when quorum is lost for multinode ETCD cluster", func() {
var (
err error
instance *druidv1alpha1.Etcd
c client.Client
s *appsv1.StatefulSet
cm *corev1.ConfigMap
svc *corev1.Service
now time.Time
longExpirationTime = metav1.NewMicroTime(now.Add(-1 * unknownThreshold).Add(-1 * time.Second).Add(-1 * notReadyThreshold))
)
BeforeEach(func() {
instance = getMultinodeEtcdDefault("foo444", "default")
c = mgr.GetClient()
ns := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: instance.Namespace,
},
}

_, err = controllerutil.CreateOrUpdate(context.TODO(), c, &ns, func() error { return nil })
Expect(err).To(Not(HaveOccurred()))

err = c.Create(context.TODO(), instance)
Expect(err).NotTo(HaveOccurred())
s = &appsv1.StatefulSet{}
Eventually(func() error { return statefulsetIsCorrectlyReconciled(c, instance, s) }, timeout, pollingInterval).Should(BeNil())
cm = &corev1.ConfigMap{}
Eventually(func() error { return configMapIsCorrectlyReconciled(c, instance, cm) }, timeout, pollingInterval).Should(BeNil())
svc = &corev1.Service{}
Eventually(func() error { return clientServiceIsCorrectlyReconciled(c, instance, svc) }, timeout, pollingInterval).Should(BeNil())
})
FIt("when renew time of member leases expired", func() {
// Deliberately update the first member lease
memberLease := &coordinationv1.Lease{}
Eventually(func() error { return fetchMemberLease(c, instance, memberLease, 1) }, timeout, pollingInterval).Should(BeNil())
err = controllerutils.TryUpdate(context.TODO(), retry.DefaultBackoff, c, memberLease, func() error {
memberLease.Spec.RenewTime = &longExpirationTime
return nil
})

// Deliberately update the second member lease
memberLease = &coordinationv1.Lease{}
Eventually(func() error { return fetchMemberLease(c, instance, memberLease, 2) }, timeout, pollingInterval).Should(BeNil())
err = controllerutils.TryUpdate(context.TODO(), retry.DefaultBackoff, c, memberLease, func() error {
memberLease.Spec.RenewTime = &longExpirationTime
return nil
})

// Check if statefulset replicas is scaled down to 0
s = &appsv1.StatefulSet{}
Eventually(func() error { return statefulsetIsScaled(c, instance, s, 0) }, timeout, pollingInterval).Should(BeNil())

// Check if statefulset replicas is scaled up to 1
s = &appsv1.StatefulSet{}
Eventually(func() error { return statefulsetIsScaled(c, instance, s, 1) }, timeout, pollingInterval).Should(BeNil())

// Check if statefulset replicas is scaled up to etcd replicas
s = &appsv1.StatefulSet{}
Eventually(func() error { return statefulsetIsScaled(c, instance, s, instance.Spec.Replicas) }, timeout, pollingInterval).Should(BeNil())

})

AfterEach(func() {
Expect(c.Delete(context.TODO(), instance)).To(Succeed())
Eventually(func() error { return statefulSetRemoved(c, s) }, timeout, pollingInterval).Should(BeNil())
Eventually(func() error { return etcdRemoved(c, instance) }, timeout, pollingInterval).Should(BeNil())
})
})
})

func getMultinodeEtcdDefault(name, namespace string) *druidv1alpha1.Etcd {
instance := &druidv1alpha1.Etcd{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: druidv1alpha1.EtcdSpec{
Annotations: map[string]string{
"app": "etcd-statefulset",
"role": "test",
"instance": name,
},
Labels: map[string]string{
"name": "etcd",
"instance": name,
},
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"name": "etcd",
"instance": name,
},
},
Replicas: 3,
Backup: druidv1alpha1.BackupSpec{},
Etcd: druidv1alpha1.EtcdConfig{},
Common: druidv1alpha1.SharedConfig{},
},
}
return instance
}

func fetchMemberLease(c client.Client, instance *druidv1alpha1.Etcd, lease *coordinationv1.Lease, replica int) error {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
req := types.NamespacedName{
Name: memberLeaseName(instance.Name, replica),
Namespace: instance.Namespace,
}

if err := c.Get(ctx, req, lease); err != nil {
return err
}

if !checkEtcdOwnerReference(lease.GetOwnerReferences(), instance) {
return fmt.Errorf("ownerReference does not exists for lease")
}
return nil
}

func memberLeaseName(etcdName string, replica int) string {
return fmt.Sprintf("%s-%d-member", etcdName, replica)
}

func statefulsetIsScaled(c client.Client, instance *druidv1alpha1.Etcd, ss *appsv1.StatefulSet, replicas int32) error {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
req := types.NamespacedName{
Name: instance.Name,
Namespace: instance.Namespace,
}

if err := c.Get(ctx, req, ss); err != nil {
return err
}

stsReplicas := *ss.Spec.Replicas
if stsReplicas != replicas {
return fmt.Errorf("statefulset replicas are yet %d instead of %d", stsReplicas, replicas)
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/component/etcd/lease/lease_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,5 @@ func getMemberLeaseLabels(val Values) map[string]string {
}

func memberLeaseName(etcdName string, replica int) string {
return fmt.Sprintf("%s-%d", etcdName, replica)
return fmt.Sprintf("%s-%d-member", etcdName, replica)
}
2 changes: 1 addition & 1 deletion pkg/component/etcd/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func checkMemberLeases(ctx context.Context, c client.Client, etcd *druidv1alpha1
func memberLeases(name string, etcdUID types.UID, replicas int32) []interface{} {
var elements []interface{}
for i := 0; i < int(replicas); i++ {
elements = append(elements, matchLeaseElement(fmt.Sprintf("%s-%d", name, i), name, etcdUID))
elements = append(elements, matchLeaseElement(fmt.Sprintf("%s-%d-member", name, i), name, etcdUID))
}

return elements
Expand Down
Loading

0 comments on commit 988bb5b

Please sign in to comment.