diff --git a/internal/watcher/crd.go b/internal/watcher/crd.go index 81579d5..0cef59d 100644 --- a/internal/watcher/crd.go +++ b/internal/watcher/crd.go @@ -31,9 +31,11 @@ type CrdWatcher struct { Client kubernetes.Interface Mutex *sync.Mutex DeploymentWatchers map[string]ConfigurableWatcher + ForceStopChan chan interface{} Namespace string - Running bool CleanupTimeout time.Duration + WatcherTimeout time.Duration + Running bool } var _ = (Watcher)((*CrdWatcher)(nil)) @@ -49,15 +51,17 @@ func NewCrdWatcher(clientset kubernetes.Interface, cmcClientset typedcmc.Interfa return &CrdWatcher{ ChaosMonkeyConfigurationInterface: cmcClientset.ChaosMonkeyConfigurationV1alpha1().ChaosMonkeyConfigurations(namespace), - - DeploymentInterface: clientset.AppsV1().Deployments(namespace), - EventRecorderLogger: recorder, - Mutex: &sync.Mutex{}, - Namespace: namespace, - Running: false, - DeploymentWatchers: map[string]ConfigurableWatcher{}, - Client: clientset, - CleanupTimeout: 1 * time.Minute, + DeploymentInterface: clientset.AppsV1().Deployments(namespace), + EventRecorderLogger: recorder, + + Client: clientset, + Mutex: &sync.Mutex{}, + DeploymentWatchers: map[string]ConfigurableWatcher{}, + ForceStopChan: make(chan interface{}), + Namespace: namespace, + CleanupTimeout: 15 * time.Minute, + WatcherTimeout: 24 * time.Hour, + Running: false, } } @@ -75,7 +79,7 @@ func (c *CrdWatcher) Start(ctx context.Context) error { var err error var wg sync.WaitGroup - watchTimeout := int64((24 * time.Hour).Seconds()) + watchTimeout := int64(c.WatcherTimeout.Seconds()) w, err := c.ChaosMonkeyConfigurationInterface.Watch(ctx, metav1.ListOptions{ Watch: true, TimeoutSeconds: &watchTimeout, @@ -91,12 +95,22 @@ func (c *CrdWatcher) Start(ctx context.Context) error { for c.IsRunning() { select { case evt := <-w.ResultChan(): - cmc := evt.Object.(*v1alpha1.ChaosMonkeyConfiguration) + cmc, ok := evt.Object.(*v1alpha1.ChaosMonkeyConfiguration) + if !ok { + logrus.Warnf("Watch for %s timed out", c.Namespace) + w, err = c.restartWatchers(ctx, &wg) + if err != nil { + logrus.Errorf("Error while restarting watchers: %s", err) + c.setRunning(false) + } + + break + } switch evt.Type { case "", watch.Error: logrus.Errorf("Received empty error or event from CRD watcher: %+v", evt) - _ = c.Stop() + c.setRunning(false) err = errors.New("Empty event or error from CRD watcher") case watch.Added: @@ -145,13 +159,19 @@ func (c *CrdWatcher) Start(ctx context.Context) error { logrus.Debug("All is good! Publishing event.") c.EventRecorderLogger.Eventf(cmc, "Normal", "Deleted", "Watcher deleted for deployment %s", cmc.Spec.DeploymentName) } + case <-ctx.Done(): logrus.Infof("Watcher context done") - _ = c.Stop() + c.setRunning(false) case <-time.After(c.CleanupTimeout): logrus.Debug("Garbage collecting Chaos Monkeys") c.cleanUp() + + case <-c.ForceStopChan: + // This is here just to wake up early from the loop + logrus.Info("Force stopping CRD Watcher") + c.setRunning(false) } } @@ -180,6 +200,15 @@ func (c *CrdWatcher) Stop() error { c.Running = false logrus.Debugf("Stopping CRD watcher for %s", c.Namespace) + logrus.Debug("Force stopping") + + select { + case c.ForceStopChan <- nil: + default: + logrus.Warn("Could not write to ForceStopChannel") + } + + close(c.ForceStopChan) return nil } @@ -313,3 +342,29 @@ func (c *CrdWatcher) cleanUp() { } } } + +func (c *CrdWatcher) restartWatchers(ctx context.Context, wg *sync.WaitGroup) (watch.Interface, error) { + c.Mutex.Lock() + defer c.Mutex.Unlock() + + logrus.Infof("Restarting CRD Watcher for %s", c.Namespace) + + logrus.Debug("Cleaning existing watchers") + for key, w := range c.DeploymentWatchers { + logrus.Debugf("Stopping watcher for %s", key) + if err := w.Stop(); err != nil { + logrus.Warnf("Error while stopping watcher for %s: %s", key, err) + } + + delete(c.DeploymentWatchers, key) + } + + logrus.Info("Waiting for monkeys to get back home") + wg.Wait() + + timeoutSeconds := int64(c.WatcherTimeout.Seconds()) + return c.ChaosMonkeyConfigurationInterface.Watch(ctx, metav1.ListOptions{ + Watch: true, + TimeoutSeconds: &timeoutSeconds, + }) +} diff --git a/internal/watcher/crd_test.go b/internal/watcher/crd_test.go index 26929a9..7bb2134 100644 --- a/internal/watcher/crd_test.go +++ b/internal/watcher/crd_test.go @@ -2,8 +2,10 @@ package watcher_test import ( "context" + "fmt" "strings" "sync" + "sync/atomic" "testing" "time" @@ -339,3 +341,79 @@ func TestCRDWatcher_Cleanup(t *testing.T) { t.Error("Watcher should be stopped") } } + +func TestCRDWatcher_Restart(t *testing.T) { + clientSet := kubernetes.NewSimpleClientset() + cmClientset := cmc.NewSimpleClientset() + w := watcher.DefaultCrdFactory(clientSet, cmClientset, record.NewFakeRecorder(1024), "chaos-monkey").(*watcher.CrdWatcher) + w.CleanupTimeout = 1 * time.Second + timesRestarted := &atomic.Int32{} + timesRestarted.Store(0) + + watcher.DefaultDeploymentFactory = func(clientset k.Interface, recorder record.EventRecorderLogger, deployment *appsv1.Deployment) watcher.ConfigurableWatcher { + return &FakeDeploymentWatcher{Mutex: &sync.Mutex{}} + } + + // Setup the scenario for the CMCs + cmClientset.PrependWatchReactor("chaosmonkeyconfigurations", func(action ktest.Action) (handled bool, ret watch.Interface, err error) { + fakeWatch := watch.NewFake() + timesRestarted.Add(1) + + go func() { + for i := range [10]int{} { + depName := fmt.Sprintf("test-%d", i) + fakeWatch.Add(&v1alpha1.ChaosMonkeyConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: depName}, + Spec: v1alpha1.ChaosMonkeyConfigurationSpec{ + PodMode: false, + Timeout: "10s", + MinReplicas: 0, + MaxReplicas: 10, + Enabled: false, + DeploymentName: depName, + }, + }) + time.Sleep(100 * time.Millisecond) + } + + fakeWatch.Stop() + }() + return true, fakeWatch, nil + }) + + clientSet.PrependReactor("get", "deployments", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { + requestedName := action.(ktest.GetAction).GetName() + return true, &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: requestedName}}, nil + }) + + // Start the watcher in background + done := make(chan interface{}) + defer close(done) + + go func() { + if err := w.Start(context.Background()); err != nil { + t.Error(err) + } + + done <- nil + }() + + time.Sleep(4300 * time.Millisecond) + + // It should still be running + w.Mutex.Lock() + if !w.Running { + t.Error("Watcher should be running") + } + w.Mutex.Unlock() + + // Now stop it and verify that the watcher restarted 5 times + if err := w.Stop(); err != nil { + t.Error(err) + } + <-done + + if timesRestarted.Load() != 5 { + t.Errorf("Expected 5 restarts, got %d", timesRestarted.Load()) + } +} diff --git a/internal/watcher/pod.go b/internal/watcher/pod.go index c7ada8e..5035b45 100644 --- a/internal/watcher/pod.go +++ b/internal/watcher/pod.go @@ -179,7 +179,7 @@ func (p *PodWatcher) Start(ctx context.Context) error { timer.Reset(p.getTimeout()) } - logrus.Infof("Stopping pod watcher in namespace %s", p.Namespace) + logrus.Infof("Pod watcher in namespace %s finished", p.Namespace) return err }