Skip to content

Commit

Permalink
Addressed Stoyan and Tim's comment on 11 Nov.
Browse files Browse the repository at this point in the history
  • Loading branch information
abdasgupta committed Nov 11, 2021
1 parent 56cf3a1 commit efa8681
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 284 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,32 @@ import (
)

const DefaultETCDQuota = 8 * 1024 * 1024 * 1024 // 8Gi
// LeaseController reconciles compaction job
type LeaseController struct {
// CompactionLeaseController reconciles compaction job
type CompactionLeaseController struct {
client.Client
logger logr.Logger
ImageVector imagevector.ImageVector
config controllersconfig.CompactionConfig
config controllersconfig.CompactionLeaseConfig
}

// NewLeaseController creates a new LeaseController object
func NewLeaseController(mgr manager.Manager, config controllersconfig.CompactionConfig) *LeaseController {
return &LeaseController{
// NewCompactionLeaseController creates a new LeaseController object
func NewCompactionLeaseController(mgr manager.Manager, config controllersconfig.CompactionLeaseConfig) *CompactionLeaseController {
return &CompactionLeaseController{
Client: mgr.GetClient(),
logger: log.Log.WithName("lease-controller"),
logger: log.Log.WithName("compaction-lease-controller"),
config: config,
}
}

// NewLeaseControllerWithImageVector creates a new LeaseController object
func NewLeaseControllerWithImageVector(mgr manager.Manager, config controllersconfig.CompactionConfig) (*LeaseController, error) {
lc := NewLeaseController(mgr, config)
// NewCompactionLeaseControllerWithImageVector creates a new LeaseController object
func NewCompactionLeaseControllerWithImageVector(mgr manager.Manager, config controllersconfig.CompactionLeaseConfig) (*CompactionLeaseController, error) {
lc := NewCompactionLeaseController(mgr, config)
return lc.InitializeControllerWithImageVector()
}

// InitializeControllerWithImageVector will use LeaseController client to initialize image vector for etcd
// and backup restore images.
func (lc *LeaseController) InitializeControllerWithImageVector() (*LeaseController, error) {
func (lc *CompactionLeaseController) InitializeControllerWithImageVector() (*CompactionLeaseController, error) {
imageVector, err := imagevector.ReadGlobalImageVectorWithEnvOverride(getImageYAMLPath())
if err != nil {
return nil, err
Expand All @@ -84,7 +84,7 @@ func (lc *LeaseController) InitializeControllerWithImageVector() (*LeaseControll
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;create;list;watch;update;patch;delete

// Reconcile reconciles the compaction job.
func (lc *LeaseController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (lc *CompactionLeaseController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
lc.logger.Info("Lease controller reconciliation started")
etcd := &druidv1alpha1.Etcd{}
if err := lc.Get(ctx, req.NamespacedName, etcd); err != nil {
Expand Down Expand Up @@ -168,7 +168,7 @@ func (lc *LeaseController) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}, nil
}

func (lc *LeaseController) reconcileJob(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (ctrl.Result, error) {
func (lc *CompactionLeaseController) reconcileJob(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (ctrl.Result, error) {
logger.Info("Reconcile etcd compaction job")

// First check if a job is already running
Expand Down Expand Up @@ -214,7 +214,7 @@ func (lc *LeaseController) reconcileJob(ctx context.Context, logger logr.Logger,
return ctrl.Result{Requeue: false}, nil
}

func (lc *LeaseController) delete(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (ctrl.Result, error) {
func (lc *CompactionLeaseController) delete(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (ctrl.Result, error) {
job := &batchv1.Job{}
err := lc.Get(ctx, types.NamespacedName{Name: getJobName(etcd), Namespace: etcd.Namespace}, job)
if err != nil {
Expand All @@ -237,7 +237,7 @@ func (lc *LeaseController) delete(ctx context.Context, logger logr.Logger, etcd
return ctrl.Result{Requeue: false}, nil
}

func (lc *LeaseController) createCompactJob(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (*batchv1.Job, error) {
func (lc *CompactionLeaseController) createCompactJob(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (*batchv1.Job, error) {
activeDeadlineSeconds := lc.config.ActiveDeadlineDuration.Seconds()

_, etcdBackupImage, err := getEtcdImages(lc.ImageVector, etcd)
Expand Down Expand Up @@ -531,7 +531,7 @@ func getCompactJobCommands(etcd *druidv1alpha1.Etcd) []string {
}

// SetupWithManager sets up manager with a new controller and ec as the reconcile.Reconciler
func (lc *LeaseController) SetupWithManager(mgr ctrl.Manager, workers int) error {
func (lc *CompactionLeaseController) SetupWithManager(mgr ctrl.Manager, workers int) error {
builder := ctrl.NewControllerManagedBy(mgr).WithOptions(controller.Options{
MaxConcurrentReconciles: workers,
})
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions controllers/config/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package config

import "time"

// CompactionConfig contains configuration for the compaction controller.
type CompactionConfig struct {
// CompactionLeaseConfig contains configuration for the compaction controller.
type CompactionLeaseConfig struct {
// ActiveDeadlineDuration is the duration after which a running compaction job will be killed (Ex: "300ms", "20s", "-1.5h" or "2h45m")
ActiveDeadlineDuration time.Duration
// EventsThreshold is total number of etcd events that can be allowed before a backup compaction job is triggered
Expand Down
245 changes: 0 additions & 245 deletions controllers/controller_ref_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ import (

druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1"
"github.com/gardener/etcd-druid/pkg/common"
"github.com/gardener/etcd-druid/pkg/utils"
"github.com/gardener/gardener/pkg/utils/imagevector"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -523,246 +521,3 @@ func CheckStatefulSet(etcd *druidv1alpha1.Etcd, statefulSet *appsv1.StatefulSet)

return nil
}

func getMapFromEtcd(im imagevector.ImageVector, etcd *druidv1alpha1.Etcd) (map[string]interface{}, error) {
var statefulsetReplicas int
if etcd.Spec.Replicas != 0 {
statefulsetReplicas = 1
}

etcdValues := map[string]interface{}{
"defragmentationSchedule": etcd.Spec.Etcd.DefragmentationSchedule,
"enableTLS": (etcd.Spec.Etcd.TLS != nil),
"pullPolicy": corev1.PullIfNotPresent,
// "username": etcd.Spec.Etcd.Username,
// "password": etcd.Spec.Etcd.Password,
}

if etcd.Spec.Etcd.Resources != nil {
etcdValues["resources"] = etcd.Spec.Etcd.Resources
}

if etcd.Spec.Etcd.Metrics != nil {
etcdValues["metrics"] = etcd.Spec.Etcd.Metrics
}

if etcd.Spec.Etcd.ServerPort != nil {
etcdValues["serverPort"] = etcd.Spec.Etcd.ServerPort
}

if etcd.Spec.Etcd.ClientPort != nil {
etcdValues["clientPort"] = etcd.Spec.Etcd.ClientPort
}

if etcd.Spec.Etcd.EtcdDefragTimeout != nil {
etcdValues["etcdDefragTimeout"] = etcd.Spec.Etcd.EtcdDefragTimeout
}

etcdImage, etcdBackupImage, err := getEtcdImages(im, etcd)
if err != nil {
return map[string]interface{}{}, err
}

if etcd.Spec.Etcd.Image == nil {
if etcdImage == "" {
return map[string]interface{}{}, fmt.Errorf("either etcd resource or image vector should have %s image", common.Etcd)
}
etcdValues["image"] = etcdImage
} else {
etcdValues["image"] = etcd.Spec.Etcd.Image
}
var quota int64 = 8 * 1024 * 1024 * 1024 // 8Gi
if etcd.Spec.Etcd.Quota != nil {
quota = etcd.Spec.Etcd.Quota.Value()
}

var deltaSnapshotMemoryLimit int64 = 100 * 1024 * 1024 // 100Mi
if etcd.Spec.Backup.DeltaSnapshotMemoryLimit != nil {
deltaSnapshotMemoryLimit = etcd.Spec.Backup.DeltaSnapshotMemoryLimit.Value()
}

var enableProfiling = false
if etcd.Spec.Backup.EnableProfiling != nil {
enableProfiling = *etcd.Spec.Backup.EnableProfiling
}

backupValues := map[string]interface{}{
"pullPolicy": corev1.PullIfNotPresent,
"etcdQuotaBytes": quota,
"etcdConnectionTimeout": "5m",
"snapstoreTempDir": "/var/etcd/data/temp",
"deltaSnapshotMemoryLimit": deltaSnapshotMemoryLimit,
"enableProfiling": enableProfiling,
}

if etcd.Spec.Backup.Resources != nil {
backupValues["resources"] = etcd.Spec.Backup.Resources
}

if etcd.Spec.Backup.FullSnapshotSchedule != nil {
backupValues["fullSnapshotSchedule"] = etcd.Spec.Backup.FullSnapshotSchedule
}

if etcd.Spec.Backup.GarbageCollectionPolicy != nil {
backupValues["garbageCollectionPolicy"] = etcd.Spec.Backup.GarbageCollectionPolicy
}

if etcd.Spec.Backup.GarbageCollectionPeriod != nil {
backupValues["garbageCollectionPeriod"] = etcd.Spec.Backup.GarbageCollectionPeriod
}

if etcd.Spec.Backup.DeltaSnapshotPeriod != nil {
backupValues["deltaSnapshotPeriod"] = etcd.Spec.Backup.DeltaSnapshotPeriod
}

if etcd.Spec.Backup.EtcdSnapshotTimeout != nil {
backupValues["etcdSnapshotTimeout"] = etcd.Spec.Backup.EtcdSnapshotTimeout
}

if etcd.Spec.Backup.Port != nil {
backupValues["port"] = etcd.Spec.Backup.Port
}

if etcd.Spec.Backup.SnapshotCompression != nil {
compressionValues := make(map[string]interface{})
if etcd.Spec.Backup.SnapshotCompression.Enabled {
compressionValues["enabled"] = etcd.Spec.Backup.SnapshotCompression.Enabled
}
if etcd.Spec.Backup.SnapshotCompression.Policy != nil {
compressionValues["policy"] = etcd.Spec.Backup.SnapshotCompression.Policy
}
backupValues["compression"] = compressionValues
}

if etcd.Spec.Backup.Image == nil {
if etcdBackupImage == "" {
return map[string]interface{}{}, fmt.Errorf("either etcd resource or image vector should have %s image", common.BackupRestore)
}
backupValues["image"] = etcdBackupImage
} else {
backupValues["image"] = etcd.Spec.Backup.Image
}

if etcd.Spec.Backup.OwnerCheck != nil {
ownerCheckValues := map[string]interface{}{
"name": etcd.Spec.Backup.OwnerCheck.Name,
"id": etcd.Spec.Backup.OwnerCheck.ID,
}
if etcd.Spec.Backup.OwnerCheck.Interval != nil {
ownerCheckValues["interval"] = etcd.Spec.Backup.OwnerCheck.Interval
}
if etcd.Spec.Backup.OwnerCheck.Timeout != nil {
ownerCheckValues["timeout"] = etcd.Spec.Backup.OwnerCheck.Timeout
}
if etcd.Spec.Backup.OwnerCheck.DNSCacheTTL != nil {
ownerCheckValues["dnsCacheTTL"] = etcd.Spec.Backup.OwnerCheck.DNSCacheTTL
}
backupValues["ownerCheck"] = ownerCheckValues
}

volumeClaimTemplateName := etcd.Name
if etcd.Spec.VolumeClaimTemplate != nil && len(*etcd.Spec.VolumeClaimTemplate) != 0 {
volumeClaimTemplateName = *etcd.Spec.VolumeClaimTemplate
}

sharedConfigValues := map[string]interface{}{
"autoCompactionMode": druidv1alpha1.Periodic,
"autoCompactionRetention": DefaultAutoCompactionRetention,
}

if etcd.Spec.Common.AutoCompactionMode != nil {
sharedConfigValues["autoCompactionMode"] = etcd.Spec.Common.AutoCompactionMode
}

if etcd.Spec.Common.AutoCompactionRetention != nil {
sharedConfigValues["autoCompactionRetention"] = etcd.Spec.Common.AutoCompactionRetention
}

values := map[string]interface{}{
"name": etcd.Name,
"uid": etcd.UID,
"selector": etcd.Spec.Selector,
"labels": etcd.Spec.Labels,
"annotations": etcd.Spec.Annotations,
"etcd": etcdValues,
"backup": backupValues,
"sharedConfig": sharedConfigValues,
"replicas": etcd.Spec.Replicas,
"statefulsetReplicas": statefulsetReplicas,
"serviceName": fmt.Sprintf("%s-client", etcd.Name),
"configMapName": fmt.Sprintf("etcd-bootstrap-%s", string(etcd.UID[:6])),
"fullSnapLeaseName": getFullSnapshotLeaseName(etcd),
"deltaSnapLeaseName": getDeltaSnapshotLeaseName(etcd),
"jobName": getJobName(etcd),
"volumeClaimTemplateName": volumeClaimTemplateName,
"serviceAccountName": getServiceAccountName(etcd),
"roleName": fmt.Sprintf("%s-br-role", etcd.Name),
"roleBindingName": fmt.Sprintf("%s-br-rolebinding", etcd.Name),
}

if etcd.Spec.StorageCapacity != nil {
values["storageCapacity"] = etcd.Spec.StorageCapacity
}

if etcd.Spec.StorageClass != nil {
values["storageClass"] = etcd.Spec.StorageClass
}

if etcd.Spec.PriorityClassName != nil {
values["priorityClassName"] = *etcd.Spec.PriorityClassName
}

if etcd.Spec.Etcd.TLS != nil {
values["tlsServerSecret"] = etcd.Spec.Etcd.TLS.ServerTLSSecretRef.Name
values["tlsClientSecret"] = etcd.Spec.Etcd.TLS.ClientTLSSecretRef.Name
values["tlsCASecret"] = etcd.Spec.Etcd.TLS.TLSCASecretRef.Name
}

if etcd.Spec.Backup.Store != nil {
if values["store"], err = utils.GetStoreValues(etcd.Spec.Backup.Store); err != nil {
return nil, err
}
}

return values, nil
}

func getServiceAccountName(etcd *druidv1alpha1.Etcd) string {
return fmt.Sprintf("%s-br-serviceaccount", etcd.Name)
}

func getEtcdImages(im imagevector.ImageVector, etcd *druidv1alpha1.Etcd) (string, string, error) {
var (
err error
images map[string]*imagevector.Image
etcdImage, etcdBackupImage string
)

imageNames := []string{
common.Etcd,
common.BackupRestore,
}

if etcd.Spec.Etcd.Image == nil || etcd.Spec.Backup.Image == nil {

images, err = imagevector.FindImages(im, imageNames)
if err != nil {
return "", "", err
}
}

val, ok := images[common.Etcd]
if !ok {
etcdImage = ""
} else {
etcdImage = val.String()
}

val, ok = images[common.BackupRestore]
if !ok {
etcdBackupImage = ""
} else {
etcdBackupImage = val.String()
}
return etcdImage, etcdBackupImage, nil
}
2 changes: 1 addition & 1 deletion controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ var _ = BeforeSuite(func(done Done) {
activeDeadlineDuration, err = time.ParseDuration("2m")
Expect(err).NotTo(HaveOccurred())

lc, err := NewLeaseControllerWithImageVector(mgr, controllersconfig.CompactionConfig{
lc, err := NewCompactionLeaseControllerWithImageVector(mgr, controllersconfig.CompactionLeaseConfig{
EventsThreshold: 1000000,
ActiveDeadlineDuration: activeDeadlineDuration,
})
Expand Down
Loading

0 comments on commit efa8681

Please sign in to comment.