Skip to content

Commit

Permalink
Deploy statefulset as component.
Browse files Browse the repository at this point in the history
  • Loading branch information
abdasgupta committed Jul 4, 2022
1 parent 3d721af commit 361ba9c
Show file tree
Hide file tree
Showing 4 changed files with 549 additions and 305 deletions.
312 changes: 7 additions & 305 deletions controllers/etcd_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
componentconfigmap "github.com/gardener/etcd-druid/pkg/component/etcd/configmap"
componentlease "github.com/gardener/etcd-druid/pkg/component/etcd/lease"
componentservice "github.com/gardener/etcd-druid/pkg/component/etcd/service"
"github.com/gardener/etcd-druid/pkg/component/etcd/statefulset"
druidpredicates "github.com/gardener/etcd-druid/pkg/predicate"
"github.com/gardener/etcd-druid/pkg/utils"

Expand All @@ -53,7 +54,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
errorsutil "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -370,7 +370,8 @@ func (r *EtcdReconciler) delete(ctx context.Context, etcd *druidv1alpha1.Etcd) (
}
}

if waitForStatefulSetCleanup, err := r.removeDependantStatefulset(ctx, logger, etcd); err != nil {
stsDeployer := componentSts.New(r.Client, etcd.Namespace, componentSts.GenerateValues(etcd))
if err := stsDeployer.Destroy(ctx); err != nil {
if err = r.updateEtcdErrorStatus(ctx, etcd, nil, err); err != nil {
return ctrl.Result{
Requeue: true,
Expand All @@ -379,10 +380,6 @@ func (r *EtcdReconciler) delete(ctx context.Context, etcd *druidv1alpha1.Etcd) (
return ctrl.Result{
Requeue: true,
}, err
} else if waitForStatefulSetCleanup {
return ctrl.Result{
RequeueAfter: 30 * time.Second,
}, nil
}

leaseDeployer := componentlease.New(r.Client, etcd.Namespace, componentlease.GenerateValues(etcd))
Expand Down Expand Up @@ -468,236 +465,6 @@ func (r *EtcdReconciler) getPodDisruptionBudgetFromEtcd(etcd *druidv1alpha1.Etcd
return nil, fmt.Errorf("missing podDisruptionBudget template file in the charts: %v", pdbPath)
}

func (r *EtcdReconciler) reconcileStatefulSet(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd, values map[string]interface{}) (*appsv1.StatefulSet, error) {
logger.Info("Reconciling etcd statefulset")

// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing Machines (see #42639).
canAdoptFunc := RecheckDeletionTimestamp(func() (metav1.Object, error) {
foundEtcd := &druidv1alpha1.Etcd{}
err := r.Get(context.TODO(), types.NamespacedName{Name: etcd.Name, Namespace: etcd.Namespace}, foundEtcd)
if err != nil {
return nil, err
}

if foundEtcd.GetDeletionTimestamp() != nil {
return nil, fmt.Errorf("%v/%v etcd is marked for deletion", etcd.Namespace, etcd.Name)
}

if foundEtcd.UID != etcd.UID {
return nil, fmt.Errorf("original %v/%v etcd gone: got uid %v, wanted %v", etcd.Namespace, etcd.Name, foundEtcd.UID, etcd.UID)
}
return foundEtcd, nil
})

selector, err := metav1.LabelSelectorAsSelector(etcd.Spec.Selector)
if err != nil {
logger.Error(err, "Error converting etcd selector to selector")
return nil, err
}
dm := NewEtcdDruidRefManager(r.Client, r.Scheme, etcd, selector, etcdGVK, canAdoptFunc)
statefulSets, err := dm.FetchStatefulSet(ctx, etcd)
if err != nil {
logger.Error(err, "Error while fetching StatefulSet")
return nil, err
}

logger.Info("Claiming existing etcd StatefulSet")
claimedStatefulSets, err := dm.ClaimStatefulsets(ctx, statefulSets)
if err != nil {
return nil, err
}

if len(claimedStatefulSets) > 0 {
// Keep only 1 statefulset. Delete the rest
for i := 1; i < len(claimedStatefulSets); i++ {
sts := claimedStatefulSets[i]
logger.Info("Found duplicate StatefulSet, deleting it", "statefulset", kutil.Key(sts.Namespace, sts.Name).String())
if err := r.Delete(ctx, sts); err != nil {
logger.Error(err, "Error in deleting duplicate StatefulSet", "statefulset", kutil.Key(sts.Namespace, sts.Name).String())
continue
}
}

// Fetch the updated statefulset
// TODO: (timuthy) Check if this is really needed.
sts := &appsv1.StatefulSet{}
if err := r.Get(ctx, types.NamespacedName{Name: claimedStatefulSets[0].Name, Namespace: claimedStatefulSets[0].Namespace}, sts); err != nil {
return nil, err
}

// Statefulset is claimed by for this etcd. Just sync the specs
if sts, err = r.syncStatefulSetSpec(ctx, logger, sts, etcd, values); err != nil {
return nil, err
}

// restart etcd pods in crashloop backoff
selector, err := metav1.LabelSelectorAsSelector(sts.Spec.Selector)
if err != nil {
logger.Error(err, "error converting StatefulSet selector to selector")
return nil, err
}
podList := &corev1.PodList{}
if err := r.List(ctx, podList, client.InNamespace(etcd.Namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
return nil, err
}

for _, pod := range podList.Items {
if utils.IsPodInCrashloopBackoff(pod.Status) {
if err := r.Delete(ctx, &pod); err != nil {
logger.Error(err, fmt.Sprintf("error deleting etcd pod in crashloop: %s/%s", pod.Namespace, pod.Name))
return nil, err
}
}
}

sts, err = r.waitUntilStatefulSetReady(ctx, logger, etcd, sts)
return sts, err
}

// Required statefulset doesn't exist. Create new
sts, err := r.getStatefulSetFromEtcd(etcd, values)
if err != nil {
return nil, err
}

err = r.Create(ctx, sts)

// Ignore the precondition violated error, this machine is already updated
// with the desired label.
if err == errorsutil.ErrPreconditionViolated {
logger.Info("StatefulSet %s precondition doesn't hold, skip updating it.", "statefulset", kutil.Key(sts.Namespace, sts.Name).String())
err = nil
}
if err != nil {
return nil, err
}

sts, err = r.waitUntilStatefulSetReady(ctx, logger, etcd, sts)
return sts, err
}

func getContainerMapFromPodTemplateSpec(spec corev1.PodSpec) map[string]corev1.Container {
containers := map[string]corev1.Container{}
for _, c := range spec.Containers {
containers[c.Name] = c
}
return containers
}

func clusterScaledUpToMultiNode(etcd *druidv1alpha1.Etcd) bool {
if etcd == nil {
return false
}
return etcd.Spec.Replicas != 1 &&
// Also consider `0` here because this field was not maintained in earlier releases.
(etcd.Status.Replicas == 0 ||
etcd.Status.Replicas == 1)
}

func (r *EtcdReconciler) syncStatefulSetSpec(ctx context.Context, logger logr.Logger, ss *appsv1.StatefulSet, etcd *druidv1alpha1.Etcd, values map[string]interface{}) (*appsv1.StatefulSet, error) {
decoded, err := r.getStatefulSetFromEtcd(etcd, values)
if err != nil {
return nil, err
}

if reflect.DeepEqual(ss.Spec, decoded.Spec) {
return ss, nil
}

ssCopy := ss.DeepCopy()
ssCopy.Spec.Replicas = decoded.Spec.Replicas
ssCopy.Spec.UpdateStrategy = decoded.Spec.UpdateStrategy

recreateSTS := false
if !reflect.DeepEqual(ssCopy.Spec.Selector, decoded.Spec.Selector) {
recreateSTS = true
}

// We introduced a peer service for multi-node etcd which must be set
// when the previous single-node StatefulSet still has the client service configured.
if ssCopy.Spec.ServiceName != decoded.Spec.ServiceName {
if clusterScaledUpToMultiNode(etcd) {
recreateSTS = true
}
}

// Applying suggestions from
containers := getContainerMapFromPodTemplateSpec(ssCopy.Spec.Template.Spec)
for i, c := range decoded.Spec.Template.Spec.Containers {
container, ok := containers[c.Name]
if !ok {
return nil, fmt.Errorf("container with name %s could not be fetched from statefulset %s", c.Name, decoded.Name)
}
// only copy requested resources from the existing stateful set to avoid copying already removed (from the etcd resource) resource limits
decoded.Spec.Template.Spec.Containers[i].Resources.Requests = container.Resources.Requests
}

ssCopy.Spec.Template = decoded.Spec.Template

if recreateSTS {
logger.Info("StatefulSet change requires recreation", "statefulset", kutil.Key(ssCopy.Namespace, ssCopy.Name).String())
err = r.recreateStatefulset(ctx, decoded)
} else {
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Patch(ctx, ssCopy, client.MergeFrom(ss))
})
}

// Ignore the precondition violated error, this machine is already updated
// with the desired label.
if err == errorsutil.ErrPreconditionViolated {
logger.Info("StatefulSet precondition doesn't hold, skip updating it", "statefulset", kutil.Key(ss.Namespace, ss.Name).String())
err = nil
}
if err != nil {
return nil, err
}
return ssCopy, err
}

func (r *EtcdReconciler) recreateStatefulset(ctx context.Context, ss *appsv1.StatefulSet) error {
skipDelete := false
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if !skipDelete {
if err := r.Delete(ctx, ss); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
skipDelete = true
return r.Create(ctx, ss)
})
return err
}

func (r *EtcdReconciler) getStatefulSetFromEtcd(etcd *druidv1alpha1.Etcd, values map[string]interface{}) (*appsv1.StatefulSet, error) {
var err error
decoded := &appsv1.StatefulSet{}
statefulSetPath := getChartPathForStatefulSet()
chartPath := getChartPath()
renderedChart, err := r.chartApplier.Render(chartPath, etcd.Name, etcd.Namespace, values)
if err != nil {
return nil, err
}
if _, ok := renderedChart.Files()[statefulSetPath]; !ok {
return nil, fmt.Errorf("missing configmap template file in the charts: %v", statefulSetPath)
}

decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(renderedChart.Files()[statefulSetPath])), 1024)
if err = decoder.Decode(&decoded); err != nil {
return nil, err
}
return decoded, nil
}

func decodeObject(renderedChart *chartrenderer.RenderedChart, path string, object interface{}) error {
if content, ok := renderedChart.Files()[path]; ok {
decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(content)), 1024)
return decoder.Decode(&object)
}
return fmt.Errorf("missing file %s in the rendered chart", path)
}

func (r *EtcdReconciler) reconcileServiceAccount(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd, values map[string]interface{}) error {
logger.Info("Reconciling serviceaccount")
var err error
Expand Down Expand Up @@ -841,9 +608,10 @@ func (r *EtcdReconciler) reconcileEtcd(ctx context.Context, logger logr.Logger,
}

val := componentetcd.Values{
ConfigMap: componentconfigmap.GenerateValues(etcd),
Lease: componentlease.GenerateValues(etcd),
Service: componentservice.GenerateValues(etcd),
ConfigMap: componentconfigmap.GenerateValues(etcd),
Lease: componentlease.GenerateValues(etcd),
Service: componentservice.GenerateValues(etcd),
StatefulSet: statefulset.GenerateValues(etcd),
}

leaseDeployer := componentlease.New(r.Client, etcd.Namespace, val.Lease)
Expand Down Expand Up @@ -897,34 +665,6 @@ func (r *EtcdReconciler) reconcileEtcd(ctx context.Context, logger logr.Logger,
return &val.Service.ClientServiceName, sts, nil
}

func checkEtcdOwnerReference(refs []metav1.OwnerReference, etcd *druidv1alpha1.Etcd) bool {
for _, ownerRef := range refs {
if ownerRef.UID == etcd.UID {
return true
}
}
return false
}

func checkEtcdAnnotations(annotations map[string]string, etcd metav1.Object) bool {
var (
ownedBy, ownerType string
ok bool
)
if annotations == nil {
return false
}
if ownedBy, ok = annotations[common.GardenerOwnedBy]; !ok {
return ok
}
if ownerType, ok = annotations[common.GardenerOwnerType]; !ok {
return ok
}
return ownedBy == fmt.Sprintf("%s/%s", etcd.GetNamespace(), etcd.GetName()) &&
ownerType == strings.ToLower(etcdGVK.Kind)

}

func (r *EtcdReconciler) getMapFromEtcd(im imagevector.ImageVector, etcd *druidv1alpha1.Etcd, val componentetcd.Values, disableEtcdServiceAccountAutomount bool) (map[string]interface{}, error) {
statefulsetReplicas := int(etcd.Spec.Replicas)

Expand Down Expand Up @@ -1207,44 +947,6 @@ func getEtcdImages(im imagevector.ImageVector, etcd *druidv1alpha1.Etcd) (string
return etcdImage, etcdBackupImage, nil
}

func (r *EtcdReconciler) removeDependantStatefulset(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (waitForStatefulSetCleanup bool, err error) {
selector, err := metav1.LabelSelectorAsSelector(etcd.Spec.Selector)
if err != nil {
return false, err
}

statefulSets := &appsv1.StatefulSetList{}
if err = r.List(ctx, statefulSets, client.InNamespace(etcd.Namespace), client.MatchingLabelsSelector{Selector: selector}); err != nil {
return false, err
}

waitForStatefulSetCleanup = false

for _, sts := range statefulSets.Items {
if canDeleteStatefulset(&sts, etcd) {
var key = kutil.Key(sts.GetNamespace(), sts.GetName()).String()
logger.Info("Deleting statefulset", "statefulset", key)
if err := r.Delete(ctx, &sts, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
return false, err
}

// StatefultSet deletion succeeded. Now we need to wait for it to be cleaned up.
waitForStatefulSetCleanup = true
}
}

return waitForStatefulSetCleanup, nil
}

func canDeleteStatefulset(sts *appsv1.StatefulSet, etcd *druidv1alpha1.Etcd) bool {
// Adding check for ownerReference to have the same delete path for statefulset.
// The statefulset with ownerReference will be deleted automatically when etcd is
// delete but we would like to explicitly delete it to maintain uniformity in the
// delete path.
return checkEtcdOwnerReference(sts.GetOwnerReferences(), etcd) ||
checkEtcdAnnotations(sts.GetAnnotations(), etcd)
}

func bootstrapReset(etcd *druidv1alpha1.Etcd) {
etcd.Status.Members = nil
etcd.Status.ClusterSize = pointer.Int32Ptr(etcd.Spec.Replicas)
Expand Down
Loading

0 comments on commit 361ba9c

Please sign in to comment.