diff --git a/internal/watcher/crd.go b/internal/watcher/crd.go index 30e6efd..23db238 100644 --- a/internal/watcher/crd.go +++ b/internal/watcher/crd.go @@ -23,6 +23,11 @@ import ( "k8s.io/client-go/tools/record" ) +type WatcherConfiguration struct { + Configuration *v1alpha1.ChaosMonkeyConfiguration + Watcher ConfigurableWatcher +} + type CrdWatcher struct { cmv1alpha1.ChaosMonkeyConfigurationInterface appsv1.DeploymentInterface @@ -31,7 +36,7 @@ type CrdWatcher struct { Logrus logrus.FieldLogger Client kubernetes.Interface Mutex *sync.Mutex - DeploymentWatchers map[string]ConfigurableWatcher + DeploymentWatchers map[string]*WatcherConfiguration ForceStopChan chan interface{} Namespace string CleanupTimeout time.Duration @@ -58,7 +63,7 @@ func NewCrdWatcher(clientset kubernetes.Interface, cmcClientset typedcmc.Interfa Logrus: logrus.WithFields(logrus.Fields{"component": "CRDWatcher", "namespace": namespace}), Client: clientset, Mutex: &sync.Mutex{}, - DeploymentWatchers: map[string]ConfigurableWatcher{}, + DeploymentWatchers: map[string]*WatcherConfiguration{}, ForceStopChan: make(chan interface{}), Namespace: namespace, CleanupTimeout: 15 * time.Minute, @@ -109,6 +114,7 @@ func (c *CrdWatcher) Start(ctx context.Context) error { } cmc := evt.Object.(*v1alpha1.ChaosMonkeyConfiguration) + c.Logrus.Debugf("Received %s event for %+v", evt.Type, cmc) switch evt.Type { case "", watch.Error: @@ -145,7 +151,7 @@ func (c *CrdWatcher) Start(ctx context.Context) error { case watch.Modified: c.Logrus.Infof("Received MODIFIED event for %s, for deployment %s", cmc.Name, cmc.Spec.DeploymentName) - if err := c.modifyWatcher(cmc); err != nil { + if err := c.modifyWatcher(ctx, cmc, &wg); err != nil { c.Logrus.Errorf("Error while trying to modify watcher: %s", err) } @@ -182,9 +188,9 @@ func (c *CrdWatcher) Start(ctx context.Context) error { // Stop all the remaining watchers c.Mutex.Lock() - for dep, watcher := range c.DeploymentWatchers { + for dep, wc := range c.DeploymentWatchers { c.Logrus.Infof("Stopping watcher for deployment %s", dep) - if err := watcher.Stop(); err != nil { + if err := wc.Watcher.Stop(); err != nil { c.Logrus.Warnf("Error while stopping watcher: %s", err) } delete(c.DeploymentWatchers, dep) @@ -266,7 +272,10 @@ func (c *CrdWatcher) addWatcher(cmc *v1alpha1.ChaosMonkeyConfiguration, dep *api newWatcher.SetTimeout(parsedDuration) c.Logrus.Debug("Adding watcher to map") - c.DeploymentWatchers[dep.Name] = newWatcher + c.DeploymentWatchers[dep.Name] = &WatcherConfiguration{ + Configuration: cmc, + Watcher: newWatcher, + } return nil } @@ -275,7 +284,7 @@ func (c *CrdWatcher) startWatcher(ctx context.Context, forDeployment string, wg c.Mutex.Lock() defer c.Mutex.Unlock() - watcher, ok := c.DeploymentWatchers[forDeployment] + wc, ok := c.DeploymentWatchers[forDeployment] if !ok { return fmt.Errorf("Watcher for deployment %s does not exist", forDeployment) } @@ -284,7 +293,7 @@ func (c *CrdWatcher) startWatcher(ctx context.Context, forDeployment string, wg go func() { defer wg.Done() c.Logrus.Debugf("Starting watcher for %s", forDeployment) - if err := watcher.Start(ctx); err != nil { + if err := wc.Watcher.Start(ctx); err != nil { c.Logrus.Errorf("Error while starting watcher: %s", err) } }() @@ -292,25 +301,84 @@ func (c *CrdWatcher) startWatcher(ctx context.Context, forDeployment string, wg return nil } -func (c *CrdWatcher) modifyWatcher(cmc *v1alpha1.ChaosMonkeyConfiguration) error { +func (c *CrdWatcher) modifyWatcher(ctx context.Context, cmc *v1alpha1.ChaosMonkeyConfiguration, wg *sync.WaitGroup) error { c.Mutex.Lock() defer c.Mutex.Unlock() - watcher, ok := c.DeploymentWatchers[cmc.Spec.DeploymentName] + wc, ok := c.DeploymentWatchers[cmc.Spec.DeploymentName] if !ok { return fmt.Errorf("Watcher for deployment %s does not exist", cmc.Spec.DeploymentName) } + // The parsing of the duration is the same for PodWatchers and DeploymentWatchers + newDuration, err := time.ParseDuration(cmc.Spec.Timeout) + if err != nil { + newDuration = 10 * time.Minute + c.Logrus.Warnf("Error while parsing timeout: %s, using default of %s", err, newDuration) + } + c.Logrus.Debugf("Reconfiguring watcher with %+v", cmc.Spec) - watcher.SetEnabled(cmc.Spec.Enabled) - watcher.SetMinReplicas(cmc.Spec.MinReplicas) - watcher.SetMaxReplicas(cmc.Spec.MaxReplicas) - parsedDuration, err := time.ParseDuration(cmc.Spec.Timeout) - if err != nil { - c.Logrus.Warnf("Error while parsing timeout: %s, not modifying it", err) + if wc.Configuration.Spec.PodMode != cmc.Spec.PodMode { + c.Logrus.Infof("CMC %s changed its pod mode, recreating the watcher from scratch", cmc.Name) + + c.Logrus.Debugf("Stopping watcher %s", cmc.Name) + if err := wc.Watcher.Stop(); err != nil { + return err + } + + delete(c.DeploymentWatchers, cmc.Spec.DeploymentName) + + // Get the deployment + dep, err := c.DeploymentInterface.Get(context.Background(), cmc.Spec.DeploymentName, metav1.GetOptions{}) + if err != nil { + return err + } + + var newWatcher ConfigurableWatcher + if cmc.Spec.PodMode { + c.Logrus.Debug("Creating new Pod watcher") + + allLabels := []string{} + for key, val := range dep.Spec.Selector.MatchLabels { + allLabels = append(allLabels, fmt.Sprintf("%s=%s", key, val)) + } + + newWatcher = DefaultPodFactory(c.Client, nil, dep.Namespace, allLabels...) + } else { + c.Logrus.Debug("Creating new Deployment watcher") + newWatcher = DefaultDeploymentFactory(c.Client, nil, dep) + } + + // Configure the watcher + newWatcher.SetEnabled(cmc.Spec.Enabled) + newWatcher.SetMinReplicas(cmc.Spec.MinReplicas) + newWatcher.SetMaxReplicas(cmc.Spec.MaxReplicas) + newWatcher.SetTimeout(newDuration) + + // Start the watcher + c.Logrus.Info("Starting the newly created watcher") + + wg.Add(1) + go func() { + defer wg.Done() + c.Logrus.Debugf("Starting watcher for %s", cmc.Spec.DeploymentName) + if err := newWatcher.Start(ctx); err != nil { + c.Logrus.Errorf("Error while starting watcher: %s", err) + } + }() + + // Put it into the map + c.DeploymentWatchers[cmc.Spec.DeploymentName] = &WatcherConfiguration{ + Configuration: cmc, + Watcher: newWatcher, + } } else { - watcher.SetTimeout(parsedDuration) + c.Logrus.Debug("Not recreating a new watcher") + wc.Watcher.SetEnabled(cmc.Spec.Enabled) + wc.Watcher.SetMinReplicas(cmc.Spec.MinReplicas) + wc.Watcher.SetMaxReplicas(cmc.Spec.MaxReplicas) + wc.Watcher.SetTimeout(newDuration) } return nil @@ -322,8 +390,8 @@ func (c *CrdWatcher) deleteWatcher(cmc *v1alpha1.ChaosMonkeyConfiguration) error c.Logrus.Infof("Deleting watcher for %s", cmc.Spec.DeploymentName) - if watcher, ok := c.DeploymentWatchers[cmc.Spec.DeploymentName]; ok { - if err := watcher.Stop(); err != nil { + if wc, ok := c.DeploymentWatchers[cmc.Spec.DeploymentName]; ok { + if err := wc.Watcher.Stop(); err != nil { c.Logrus.Warnf("Error while stopping watcher: %s", err) } delete(c.DeploymentWatchers, cmc.Spec.DeploymentName) @@ -338,8 +406,8 @@ func (c *CrdWatcher) cleanUp() { c.Mutex.Lock() defer c.Mutex.Unlock() - for name, watcher := range c.DeploymentWatchers { - if !watcher.IsRunning() { + for name, wc := range c.DeploymentWatchers { + if !wc.Watcher.IsRunning() { c.Logrus.Infof("Removing watcher for %s", name) delete(c.DeploymentWatchers, name) } @@ -353,9 +421,9 @@ func (c *CrdWatcher) restartWatch(ctx context.Context, wg *sync.WaitGroup) (watc c.Logrus.Info("Restarting CRD Watcher") c.Logrus.Debug("Cleaning existing watchers") - for key, w := range c.DeploymentWatchers { + for key, wc := range c.DeploymentWatchers { c.Logrus.Debugf("Stopping watcher for %s", key) - if err := w.Stop(); err != nil { + if err := wc.Watcher.Stop(); err != nil { c.Logrus.Warnf("Error while stopping watcher for %s: %s", key, err) } diff --git a/internal/watcher/crd_test.go b/internal/watcher/crd_test.go index 7bb2134..5ddd33c 100644 --- a/internal/watcher/crd_test.go +++ b/internal/watcher/crd_test.go @@ -101,6 +101,22 @@ func TestCRDWatcher_Create(t *testing.T) { } } +func createCMC(name string, enabled, podMode bool, minReplicas, maxReplicas int, deploymentName, timeout string) *v1alpha1.ChaosMonkeyConfiguration { + return &v1alpha1.ChaosMonkeyConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1alpha1.ChaosMonkeyConfigurationSpec{ + Enabled: enabled, + MinReplicas: minReplicas, + MaxReplicas: maxReplicas, + DeploymentName: deploymentName, + Timeout: timeout, + PodMode: podMode, + }, + } +} + func TestCRDWatcher_BasicBehaviour(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) clientSet := kubernetes.NewSimpleClientset() @@ -123,22 +139,6 @@ func TestCRDWatcher_BasicBehaviour(t *testing.T) { fakeWatch := watch.NewFake() go func() { - createCMC := func(name string, enabled, podMode bool, minReplicas, maxReplicas int, deploymentName, timeout string) *v1alpha1.ChaosMonkeyConfiguration { - return &v1alpha1.ChaosMonkeyConfiguration{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: v1alpha1.ChaosMonkeyConfigurationSpec{ - Enabled: enabled, - MinReplicas: minReplicas, - MaxReplicas: maxReplicas, - DeploymentName: deploymentName, - Timeout: timeout, - PodMode: podMode, - }, - } - } - fakeWatch.Add(createCMC("test-1", true, false, 1, 1, "test-1", "1s")) fakeWatch.Add(createCMC("test-2", false, true, 1, 1, "test-2", "10s")) fakeWatch.Add(createCMC("test-3", true, true, 1, 1, "test-3", "invalidstring")) @@ -195,7 +195,7 @@ func TestCRDWatcher_BasicBehaviour(t *testing.T) { for key, crd := range w.DeploymentWatchers { if key == depName { found = true - check(crd.(*FakeDeploymentWatcher)) + check(crd.Watcher.(*FakeDeploymentWatcher)) } } @@ -288,10 +288,10 @@ func TestCRDWatcher_Cleanup(t *testing.T) { w.CleanupTimeout = 1 * time.Second // Inject some FakeDeploymentWatchers inside the watcher itself - w.DeploymentWatchers = map[string]watcher.ConfigurableWatcher{ - "test-1": &FakeDeploymentWatcher{Running: true, Mutex: &sync.Mutex{}}, - "test-2": &FakeDeploymentWatcher{Running: false, Mutex: &sync.Mutex{}}, - "test-3": &FakeDeploymentWatcher{Running: false, Mutex: &sync.Mutex{}}, + w.DeploymentWatchers = map[string]*watcher.WatcherConfiguration{ + "test-1": {Configuration: nil, Watcher: &FakeDeploymentWatcher{Running: true, Mutex: &sync.Mutex{}}}, + "test-2": {Configuration: nil, Watcher: &FakeDeploymentWatcher{Running: false, Mutex: &sync.Mutex{}}}, + "test-3": {Configuration: nil, Watcher: &FakeDeploymentWatcher{Running: false, Mutex: &sync.Mutex{}}}, } // Setup the scenario for the CMCs @@ -362,17 +362,7 @@ func TestCRDWatcher_Restart(t *testing.T) { go func() { for i := range [10]int{} { depName := fmt.Sprintf("test-%d", i) - fakeWatch.Add(&v1alpha1.ChaosMonkeyConfiguration{ - ObjectMeta: metav1.ObjectMeta{Name: depName}, - Spec: v1alpha1.ChaosMonkeyConfigurationSpec{ - PodMode: false, - Timeout: "10s", - MinReplicas: 0, - MaxReplicas: 10, - Enabled: false, - DeploymentName: depName, - }, - }) + fakeWatch.Add(createCMC(depName, false, false, 0, 10, depName, "10s")) time.Sleep(100 * time.Millisecond) } @@ -417,3 +407,119 @@ func TestCRDWatcher_Restart(t *testing.T) { t.Errorf("Expected 5 restarts, got %d", timesRestarted.Load()) } } + +func TestCRDWatcher_ModifyWatcherType(t *testing.T) { + clientSet := kubernetes.NewSimpleClientset() + cmClientset := cmc.NewSimpleClientset() + w := watcher.DefaultCrdFactory(clientSet, cmClientset, record.NewFakeRecorder(1024), "chaos-monkey").(*watcher.CrdWatcher) + w.CleanupTimeout = 1 * time.Second + + // Number of times each watcher has been created + podWatchers := &atomic.Int32{} + deployWatchers := &atomic.Int32{} + podWatchers.Store(0) + deployWatchers.Store(0) + + fakeWatch := watch.NewFake() + + watcher.DefaultDeploymentFactory = func(clientset k.Interface, recorder record.EventRecorderLogger, deployment *appsv1.Deployment) watcher.ConfigurableWatcher { + deployWatchers.Add(1) + return &FakeDeploymentWatcher{Mutex: &sync.Mutex{}} + } + + watcher.DefaultPodFactory = func(clientset k.Interface, recorder record.EventRecorderLogger, namespace string, labelSelector ...string) watcher.ConfigurableWatcher { + podWatchers.Add(1) + return &FakeDeploymentWatcher{Mutex: &sync.Mutex{}} + } + + clientSet.PrependReactor("get", "deployments", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { + requestedName := action.(ktest.GetAction).GetName() + return true, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: requestedName}, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": requestedName}}, + }, + }, nil + }) + + cmClientset.PrependWatchReactor("chaosmonkeyconfigurations", func(action ktest.Action) (handled bool, ret watch.Interface, err error) { + go func() { + fakeWatch.Add(createCMC("test-deploy", false, false, 0, 10, "test-deploy", "10s")) + fakeWatch.Add(createCMC("test-pod", false, true, 0, 10, "test-pod", "10s")) + }() + + return true, fakeWatch, nil + }) + + done := make(chan interface{}) + defer close(done) + + go func() { + if err := w.Start(context.Background()); err != nil { + t.Error(err) + } + + done <- nil + }() + + // Wait for the events to be processed + time.Sleep(500 * time.Millisecond) + + // We should have 2 watchers + w.Mutex.Lock() + if cnt := len(w.DeploymentWatchers); cnt != 2 { + t.Errorf("Expected 2 watchers, got %d", cnt) + } + if podWatchers.Load() != 1 { + t.Errorf("Expected 1 pod watcher, got %d", podWatchers.Load()) + } + if deployWatchers.Load() != 1 { + t.Errorf("Expected 1 deployment watcher, got %d", deployWatchers.Load()) + } + + w.Mutex.Unlock() + + // Now send a Modify event + fakeWatch.Modify(createCMC("test-deploy", false, true, 0, 10, "test-deploy", "10s")) + time.Sleep(100 * time.Millisecond) + + // We should still have 2 watchers + w.Mutex.Lock() + if cnt := len(w.DeploymentWatchers); cnt != 2 { + t.Errorf("Expected 2 watchers, got %d", cnt) + } + + // This time we should have 2 podwatchers created + if podWatchers.Load() != 2 { + t.Errorf("Expected 2 pod watchers, got %d", podWatchers.Load()) + } + + // But still only 1 deploywatchers + if deployWatchers.Load() != 1 { + t.Errorf("Expected 1 deployment watcher, got %d", deployWatchers.Load()) + } + w.Mutex.Unlock() + + // Now send another Modify event + fakeWatch.Modify(createCMC("test-pod", false, false, 0, 10, "test-pod", "10s")) + time.Sleep(100 * time.Millisecond) + + // Still 2 watchers + w.Mutex.Lock() + if cnt := len(w.DeploymentWatchers); cnt != 2 { + t.Errorf("Expected 2 watchers, got %d", cnt) + } + + // Now both should have 2 calls + if podWatchers.Load() != 2 { + t.Errorf("Expected 2 pod watchers, got %d", podWatchers.Load()) + } + + if deployWatchers.Load() != 2 { + t.Errorf("Expected 2 deployment watchers, got %d", deployWatchers.Load()) + } + w.Mutex.Unlock() + + _ = w.Stop() + <-done +}