Skip to content

Commit

Permalink
feat: make it possible to hot change the podType
Browse files Browse the repository at this point in the history
  • Loading branch information
Massimo Gengarelli committed Jul 1, 2024
1 parent ec20449 commit 26b1783
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 55 deletions.
114 changes: 91 additions & 23 deletions internal/watcher/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"k8s.io/client-go/tools/record"
)

type WatcherConfiguration struct {
Configuration *v1alpha1.ChaosMonkeyConfiguration
Watcher ConfigurableWatcher
}

type CrdWatcher struct {
cmv1alpha1.ChaosMonkeyConfigurationInterface
appsv1.DeploymentInterface
Expand All @@ -31,7 +36,7 @@ type CrdWatcher struct {
Logrus logrus.FieldLogger
Client kubernetes.Interface
Mutex *sync.Mutex
DeploymentWatchers map[string]ConfigurableWatcher
DeploymentWatchers map[string]*WatcherConfiguration
ForceStopChan chan interface{}
Namespace string
CleanupTimeout time.Duration
Expand All @@ -58,7 +63,7 @@ func NewCrdWatcher(clientset kubernetes.Interface, cmcClientset typedcmc.Interfa
Logrus: logrus.WithFields(logrus.Fields{"component": "CRDWatcher", "namespace": namespace}),
Client: clientset,
Mutex: &sync.Mutex{},
DeploymentWatchers: map[string]ConfigurableWatcher{},
DeploymentWatchers: map[string]*WatcherConfiguration{},
ForceStopChan: make(chan interface{}),
Namespace: namespace,
CleanupTimeout: 15 * time.Minute,
Expand Down Expand Up @@ -109,6 +114,7 @@ func (c *CrdWatcher) Start(ctx context.Context) error {
}

cmc := evt.Object.(*v1alpha1.ChaosMonkeyConfiguration)
c.Logrus.Debugf("Received %s event for %+v", evt.Type, cmc)

switch evt.Type {
case "", watch.Error:
Expand Down Expand Up @@ -145,7 +151,7 @@ func (c *CrdWatcher) Start(ctx context.Context) error {
case watch.Modified:
c.Logrus.Infof("Received MODIFIED event for %s, for deployment %s", cmc.Name, cmc.Spec.DeploymentName)

if err := c.modifyWatcher(cmc); err != nil {
if err := c.modifyWatcher(ctx, cmc, &wg); err != nil {
c.Logrus.Errorf("Error while trying to modify watcher: %s", err)
}

Expand Down Expand Up @@ -182,9 +188,9 @@ func (c *CrdWatcher) Start(ctx context.Context) error {

// Stop all the remaining watchers
c.Mutex.Lock()
for dep, watcher := range c.DeploymentWatchers {
for dep, wc := range c.DeploymentWatchers {
c.Logrus.Infof("Stopping watcher for deployment %s", dep)
if err := watcher.Stop(); err != nil {
if err := wc.Watcher.Stop(); err != nil {
c.Logrus.Warnf("Error while stopping watcher: %s", err)
}
delete(c.DeploymentWatchers, dep)
Expand Down Expand Up @@ -266,7 +272,10 @@ func (c *CrdWatcher) addWatcher(cmc *v1alpha1.ChaosMonkeyConfiguration, dep *api
newWatcher.SetTimeout(parsedDuration)

c.Logrus.Debug("Adding watcher to map")
c.DeploymentWatchers[dep.Name] = newWatcher
c.DeploymentWatchers[dep.Name] = &WatcherConfiguration{
Configuration: cmc,
Watcher: newWatcher,
}

return nil
}
Expand All @@ -275,7 +284,7 @@ func (c *CrdWatcher) startWatcher(ctx context.Context, forDeployment string, wg
c.Mutex.Lock()
defer c.Mutex.Unlock()

watcher, ok := c.DeploymentWatchers[forDeployment]
wc, ok := c.DeploymentWatchers[forDeployment]
if !ok {
return fmt.Errorf("Watcher for deployment %s does not exist", forDeployment)
}
Expand All @@ -284,33 +293,92 @@ func (c *CrdWatcher) startWatcher(ctx context.Context, forDeployment string, wg
go func() {
defer wg.Done()
c.Logrus.Debugf("Starting watcher for %s", forDeployment)
if err := watcher.Start(ctx); err != nil {
if err := wc.Watcher.Start(ctx); err != nil {
c.Logrus.Errorf("Error while starting watcher: %s", err)
}
}()

return nil
}

func (c *CrdWatcher) modifyWatcher(cmc *v1alpha1.ChaosMonkeyConfiguration) error {
func (c *CrdWatcher) modifyWatcher(ctx context.Context, cmc *v1alpha1.ChaosMonkeyConfiguration, wg *sync.WaitGroup) error {
c.Mutex.Lock()
defer c.Mutex.Unlock()

watcher, ok := c.DeploymentWatchers[cmc.Spec.DeploymentName]
wc, ok := c.DeploymentWatchers[cmc.Spec.DeploymentName]
if !ok {
return fmt.Errorf("Watcher for deployment %s does not exist", cmc.Spec.DeploymentName)
}

// The parsing of the duration is the same for PodWatchers and DeploymentWatchers
newDuration, err := time.ParseDuration(cmc.Spec.Timeout)
if err != nil {
newDuration = 10 * time.Minute
c.Logrus.Warnf("Error while parsing timeout: %s, using default of %s", err, newDuration)
}

c.Logrus.Debugf("Reconfiguring watcher with %+v", cmc.Spec)
watcher.SetEnabled(cmc.Spec.Enabled)
watcher.SetMinReplicas(cmc.Spec.MinReplicas)
watcher.SetMaxReplicas(cmc.Spec.MaxReplicas)

parsedDuration, err := time.ParseDuration(cmc.Spec.Timeout)
if err != nil {
c.Logrus.Warnf("Error while parsing timeout: %s, not modifying it", err)
if wc.Configuration.Spec.PodMode != cmc.Spec.PodMode {
c.Logrus.Infof("CMC %s changed its pod mode, recreating the watcher from scratch", cmc.Name)

c.Logrus.Debugf("Stopping watcher %s", cmc.Name)
if err := wc.Watcher.Stop(); err != nil {
return err
}

delete(c.DeploymentWatchers, cmc.Spec.DeploymentName)

// Get the deployment
dep, err := c.DeploymentInterface.Get(context.Background(), cmc.Spec.DeploymentName, metav1.GetOptions{})
if err != nil {
return err
}

var newWatcher ConfigurableWatcher
if cmc.Spec.PodMode {
c.Logrus.Debug("Creating new Pod watcher")

allLabels := []string{}
for key, val := range dep.Spec.Selector.MatchLabels {
allLabels = append(allLabels, fmt.Sprintf("%s=%s", key, val))
}

newWatcher = DefaultPodFactory(c.Client, nil, dep.Namespace, allLabels...)
} else {
c.Logrus.Debug("Creating new Deployment watcher")
newWatcher = DefaultDeploymentFactory(c.Client, nil, dep)
}

// Configure the watcher
newWatcher.SetEnabled(cmc.Spec.Enabled)
newWatcher.SetMinReplicas(cmc.Spec.MinReplicas)
newWatcher.SetMaxReplicas(cmc.Spec.MaxReplicas)
newWatcher.SetTimeout(newDuration)

// Start the watcher
c.Logrus.Info("Starting the newly created watcher")

wg.Add(1)
go func() {
defer wg.Done()
c.Logrus.Debugf("Starting watcher for %s", cmc.Spec.DeploymentName)
if err := newWatcher.Start(ctx); err != nil {
c.Logrus.Errorf("Error while starting watcher: %s", err)
}
}()

// Put it into the map
c.DeploymentWatchers[cmc.Spec.DeploymentName] = &WatcherConfiguration{
Configuration: cmc,
Watcher: newWatcher,
}
} else {
watcher.SetTimeout(parsedDuration)
c.Logrus.Debug("Not recreating a new watcher")
wc.Watcher.SetEnabled(cmc.Spec.Enabled)
wc.Watcher.SetMinReplicas(cmc.Spec.MinReplicas)
wc.Watcher.SetMaxReplicas(cmc.Spec.MaxReplicas)
wc.Watcher.SetTimeout(newDuration)
}

return nil
Expand All @@ -322,8 +390,8 @@ func (c *CrdWatcher) deleteWatcher(cmc *v1alpha1.ChaosMonkeyConfiguration) error

c.Logrus.Infof("Deleting watcher for %s", cmc.Spec.DeploymentName)

if watcher, ok := c.DeploymentWatchers[cmc.Spec.DeploymentName]; ok {
if err := watcher.Stop(); err != nil {
if wc, ok := c.DeploymentWatchers[cmc.Spec.DeploymentName]; ok {
if err := wc.Watcher.Stop(); err != nil {
c.Logrus.Warnf("Error while stopping watcher: %s", err)
}
delete(c.DeploymentWatchers, cmc.Spec.DeploymentName)
Expand All @@ -338,8 +406,8 @@ func (c *CrdWatcher) cleanUp() {
c.Mutex.Lock()
defer c.Mutex.Unlock()

for name, watcher := range c.DeploymentWatchers {
if !watcher.IsRunning() {
for name, wc := range c.DeploymentWatchers {
if !wc.Watcher.IsRunning() {
c.Logrus.Infof("Removing watcher for %s", name)
delete(c.DeploymentWatchers, name)
}
Expand All @@ -353,9 +421,9 @@ func (c *CrdWatcher) restartWatch(ctx context.Context, wg *sync.WaitGroup) (watc
c.Logrus.Info("Restarting CRD Watcher")

c.Logrus.Debug("Cleaning existing watchers")
for key, w := range c.DeploymentWatchers {
for key, wc := range c.DeploymentWatchers {
c.Logrus.Debugf("Stopping watcher for %s", key)
if err := w.Stop(); err != nil {
if err := wc.Watcher.Stop(); err != nil {
c.Logrus.Warnf("Error while stopping watcher for %s: %s", key, err)
}

Expand Down
Loading

0 comments on commit 26b1783

Please sign in to comment.