diff --git a/Makefile b/Makefile index 5600f14..f855a29 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ TERRAFORM := $(shell which terraform) DOCKER := $(shell which docker) APPNAME ?= chaos-monkey IMAGE ?= chaos-monkey -TAG ?= 2.0.2 +TAG ?= 2.0.3 all: bin/$(APPNAME) .PHONY: clean generate bin/$(APPNAME) image-version cluster-test diff --git a/internal/watcher/crd.go b/internal/watcher/crd.go index e7bf105..81579d5 100644 --- a/internal/watcher/crd.go +++ b/internal/watcher/crd.go @@ -75,12 +75,15 @@ func (c *CrdWatcher) Start(ctx context.Context) error { var err error var wg sync.WaitGroup + watchTimeout := int64((24 * time.Hour).Seconds()) w, err := c.ChaosMonkeyConfigurationInterface.Watch(ctx, metav1.ListOptions{ - Watch: true, + Watch: true, + TimeoutSeconds: &watchTimeout, }) if err != nil { return err } + defer w.Stop() c.setRunning(true) @@ -88,11 +91,6 @@ func (c *CrdWatcher) Start(ctx context.Context) error { for c.IsRunning() { select { case evt := <-w.ResultChan(): - if evt.Object == nil { - logrus.Warnf("Received event with nil object: %+v", evt) - continue - } - cmc := evt.Object.(*v1alpha1.ChaosMonkeyConfiguration) switch evt.Type { diff --git a/internal/watcher/crd_test.go b/internal/watcher/crd_test.go index 6225e70..26929a9 100644 --- a/internal/watcher/crd_test.go +++ b/internal/watcher/crd_test.go @@ -339,42 +339,3 @@ func TestCRDWatcher_Cleanup(t *testing.T) { t.Error("Watcher should be stopped") } } - -func TestCRDWatcher_NilObject(t *testing.T) { - clientSet := kubernetes.NewSimpleClientset() - cmClientset := cmc.NewSimpleClientset() - w := watcher.DefaultCrdFactory(clientSet, cmClientset, record.NewFakeRecorder(1024), "chaos-monkey").(*watcher.CrdWatcher) - w.CleanupTimeout = 300 * time.Millisecond - - // Setup the scenario for the CMCs - cmClientset.PrependWatchReactor("chaosmonkeyconfigurations", func(action ktest.Action) (handled bool, ret watch.Interface, err error) { - fakeWatch := watch.NewFake() - go func() { - fakeWatch.Add(nil) - }() - return true, fakeWatch, nil - }) - - // Now start the watcher - done := make(chan struct{}) - defer close(done) - - go func() { - if err := w.Start(context.Background()); err != nil { - t.Error(err) - } - - done <- struct{}{} - }() - - time.Sleep(1 * time.Second) - - // Despite the nil event the watcher is still running - if !w.IsRunning() { - t.Error("Watcher stopped") - } - - _ = w.Stop() - - <-done -} diff --git a/internal/watcher/namespace.go b/internal/watcher/namespace.go index 8ce6106..3426c1a 100644 --- a/internal/watcher/namespace.go +++ b/internal/watcher/namespace.go @@ -74,10 +74,15 @@ func (n *NamespaceWatcher) Start(ctx context.Context) error { logrus.Infof("Starting namespace watcher for %s", n.RootNamespace) - w, err := n.Watch(ctx, v1.ListOptions{}) + timeoutSeconds := int64((24 * time.Hour).Seconds()) + w, err := n.Watch(ctx, v1.ListOptions{ + Watch: true, + TimeoutSeconds: &timeoutSeconds, + }) if err != nil { return err } + defer w.Stop() n.setRunning(true) @@ -85,11 +90,6 @@ func (n *NamespaceWatcher) Start(ctx context.Context) error { for n.IsRunning() { select { case evt := <-w.ResultChan(): - if evt.Object == nil { - logrus.Warnf("Received event with nil object: %+v", evt) - continue - } - ns := evt.Object.(*corev1.Namespace) switch evt.Type { diff --git a/internal/watcher/namespace_test.go b/internal/watcher/namespace_test.go index 5acf494..7b06c67 100644 --- a/internal/watcher/namespace_test.go +++ b/internal/watcher/namespace_test.go @@ -245,42 +245,3 @@ func TestNamespaceWatcher_Cleanup(t *testing.T) { t.Errorf("Expected 0 watchers, got %d", len(w.CrdWatchers)) } } - -func TestNamespaceWatcher_NilObject(t *testing.T) { - logrus.SetLevel(logrus.DebugLevel) - clientset := kubernetes.NewSimpleClientset() - w := watcher.DefaultNamespaceFactory(clientset, cmcClientset, record.NewFakeRecorder(1024), "chaos-monkey") - w.(*watcher.NamespaceWatcher).CleanupTimeout = 300 * time.Millisecond - - clientset.PrependWatchReactor("namespaces", func(action ktest.Action) (handled bool, ret watch.Interface, err error) { - fakeWatch := watch.NewFake() - - go func() { - fakeWatch.Add(nil) - }() - return true, fakeWatch, nil - }) - - // Now start the watcher - done := make(chan struct{}, 1) - defer close(done) - - go func() { - if err := w.Start(context.Background()); err != nil { - t.Error(err) - } - - done <- struct{}{} - }() - - // Wait for all the events to be processed - time.Sleep(1 * time.Second) - - // Despite the nil event, the watcher is still running - if !w.IsRunning() { - t.Error("Watcher stopped") - } - - _ = w.Stop() - <-done -} diff --git a/internal/watcher/pod.go b/internal/watcher/pod.go index 9542f9c..c7ada8e 100644 --- a/internal/watcher/pod.go +++ b/internal/watcher/pod.go @@ -99,9 +99,12 @@ func (p *PodWatcher) IsRunning() bool { func (p *PodWatcher) Start(ctx context.Context) error { var err error logrus.Infof("Starting pod watcher in namespace %s", p.Namespace) + + watchTimeout := int64((24 * time.Hour).Seconds()) w, err := p.Watch(ctx, metav1.ListOptions{ - Watch: true, - LabelSelector: p.LabelSelector, + Watch: true, + LabelSelector: p.LabelSelector, + TimeoutSeconds: &watchTimeout, }) if err != nil { return err @@ -116,20 +119,10 @@ func (p *PodWatcher) Start(ctx context.Context) error { for p.IsRunning() { select { - case evt, ok := <-w.ResultChan(): + case evt := <-w.ResultChan(): logrus.Debugf("Pod Watcher received event: %s", evt.Type) - if evt.Object == nil { - logrus.Warnf("Received evt with nil object: %+v", evt) - timer.Reset(p.getTimeout()) - continue - } - pod := evt.Object.(*apicorev1.Pod) - if !ok { - return errors.New("Pod Watcher channel closed") - } - switch evt.Type { case "", watch.Error: logrus.Errorf("Received empty event or error from pod watcher: %+v", evt) diff --git a/internal/watcher/pod_test.go b/internal/watcher/pod_test.go index cc375e3..bf9a5a9 100644 --- a/internal/watcher/pod_test.go +++ b/internal/watcher/pod_test.go @@ -154,37 +154,6 @@ func TestPodWatcher_BasicBehaviour(t *gtest.T) { <-done } -func TestPodWatcher_Error(t *gtest.T) { - logrus.SetLevel(logrus.DebugLevel) - clientset := fake.NewSimpleClientset() - recorder := record.NewFakeRecorder(1024) - p := watcher.NewPodWatcher(clientset, recorder, "test", "app=name").(*watcher.PodWatcher) - p.Timeout = 1 * time.Second - - done := make(chan interface{}) - defer close(done) - - clientset.PrependWatchReactor("pods", func(action ktest.Action) (handled bool, ret watch.Interface, err error) { - fakeWatch := watch.NewFake() - go func() { - fakeWatch.Error(createPod("test")) - }() - - return true, fakeWatch, nil - }) - - // Start the watcher in background, it should fail - go func() { - if err := p.Start(context.Background()); err == nil { - t.Error("Was expecting an error") - } - - done <- nil - }() - - <-done -} - func TestPodWatcher_DeletePods(t *gtest.T) { logrus.SetLevel(logrus.DebugLevel) clientset := fake.NewSimpleClientset() @@ -340,50 +309,3 @@ func TestPodWatcher_NotEnabled(t *gtest.T) { cancel() <-done } - -func TestPodWatcher_NilObject(t *gtest.T) { - logrus.SetLevel(logrus.DebugLevel) - clientset := fake.NewSimpleClientset() - recorder := record.NewFakeRecorder(1024) - p := watcher.NewPodWatcher(clientset, recorder, "test", "app=name").(*watcher.PodWatcher) - fakeWatch := watch.NewFake() - - p.SetTimeout(100 * time.Millisecond) - p.SetEnabled(true) - - done := make(chan interface{}) - - // Create a cancellable context - ctx, cancel := context.WithCancel(context.Background()) - - clientset.PrependWatchReactor("pods", func(action ktest.Action) (handled bool, ret watch.Interface, err error) { - go func() { - // Add a bunch of pods at regular intervals - for range [10]int{} { - fakeWatch.Add(nil) - time.Sleep(50 * time.Millisecond) - } - }() - return true, fakeWatch, nil - }) - - // Start watcher in background - go func() { - if err := p.Start(ctx); err != nil { - t.Error(err) - } - - done <- nil - }() - - // Wait for the events to be processed - time.Sleep(1 * time.Second) - - // Despite the nil event, the watcher is still running - if !p.IsRunning() { - t.Error("Watcher stopped") - } - - cancel() - <-done -}