Skip to content

Commit

Permalink
Merge pull request #9 from massix/fix-channel-closed
Browse files Browse the repository at this point in the history
Fix channel closed
  • Loading branch information
massix authored Jun 29, 2024
2 parents 8ea7eff + ab03319 commit 965892b
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 182 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ TERRAFORM := $(shell which terraform)
DOCKER := $(shell which docker)
APPNAME ?= chaos-monkey
IMAGE ?= chaos-monkey
TAG ?= 2.0.2
TAG ?= 2.0.3

all: bin/$(APPNAME)
.PHONY: clean generate bin/$(APPNAME) image-version cluster-test
Expand Down
10 changes: 4 additions & 6 deletions internal/watcher/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,22 @@ func (c *CrdWatcher) Start(ctx context.Context) error {
var err error
var wg sync.WaitGroup

watchTimeout := int64((24 * time.Hour).Seconds())
w, err := c.ChaosMonkeyConfigurationInterface.Watch(ctx, metav1.ListOptions{
Watch: true,
Watch: true,
TimeoutSeconds: &watchTimeout,
})
if err != nil {
return err
}

defer w.Stop()

c.setRunning(true)

for c.IsRunning() {
select {
case evt := <-w.ResultChan():
if evt.Object == nil {
logrus.Warnf("Received event with nil object: %+v", evt)
continue
}

cmc := evt.Object.(*v1alpha1.ChaosMonkeyConfiguration)

switch evt.Type {
Expand Down
39 changes: 0 additions & 39 deletions internal/watcher/crd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,42 +339,3 @@ func TestCRDWatcher_Cleanup(t *testing.T) {
t.Error("Watcher should be stopped")
}
}

func TestCRDWatcher_NilObject(t *testing.T) {
clientSet := kubernetes.NewSimpleClientset()
cmClientset := cmc.NewSimpleClientset()
w := watcher.DefaultCrdFactory(clientSet, cmClientset, record.NewFakeRecorder(1024), "chaos-monkey").(*watcher.CrdWatcher)
w.CleanupTimeout = 300 * time.Millisecond

// Setup the scenario for the CMCs
cmClientset.PrependWatchReactor("chaosmonkeyconfigurations", func(action ktest.Action) (handled bool, ret watch.Interface, err error) {
fakeWatch := watch.NewFake()
go func() {
fakeWatch.Add(nil)
}()
return true, fakeWatch, nil
})

// Now start the watcher
done := make(chan struct{})
defer close(done)

go func() {
if err := w.Start(context.Background()); err != nil {
t.Error(err)
}

done <- struct{}{}
}()

time.Sleep(1 * time.Second)

// Despite the nil event the watcher is still running
if !w.IsRunning() {
t.Error("Watcher stopped")
}

_ = w.Stop()

<-done
}
12 changes: 6 additions & 6 deletions internal/watcher/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,22 @@ func (n *NamespaceWatcher) Start(ctx context.Context) error {

logrus.Infof("Starting namespace watcher for %s", n.RootNamespace)

w, err := n.Watch(ctx, v1.ListOptions{})
timeoutSeconds := int64((24 * time.Hour).Seconds())
w, err := n.Watch(ctx, v1.ListOptions{
Watch: true,
TimeoutSeconds: &timeoutSeconds,
})
if err != nil {
return err
}

defer w.Stop()

n.setRunning(true)

for n.IsRunning() {
select {
case evt := <-w.ResultChan():
if evt.Object == nil {
logrus.Warnf("Received event with nil object: %+v", evt)
continue
}

ns := evt.Object.(*corev1.Namespace)

switch evt.Type {
Expand Down
39 changes: 0 additions & 39 deletions internal/watcher/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,42 +245,3 @@ func TestNamespaceWatcher_Cleanup(t *testing.T) {
t.Errorf("Expected 0 watchers, got %d", len(w.CrdWatchers))
}
}

func TestNamespaceWatcher_NilObject(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
clientset := kubernetes.NewSimpleClientset()
w := watcher.DefaultNamespaceFactory(clientset, cmcClientset, record.NewFakeRecorder(1024), "chaos-monkey")
w.(*watcher.NamespaceWatcher).CleanupTimeout = 300 * time.Millisecond

clientset.PrependWatchReactor("namespaces", func(action ktest.Action) (handled bool, ret watch.Interface, err error) {
fakeWatch := watch.NewFake()

go func() {
fakeWatch.Add(nil)
}()
return true, fakeWatch, nil
})

// Now start the watcher
done := make(chan struct{}, 1)
defer close(done)

go func() {
if err := w.Start(context.Background()); err != nil {
t.Error(err)
}

done <- struct{}{}
}()

// Wait for all the events to be processed
time.Sleep(1 * time.Second)

// Despite the nil event, the watcher is still running
if !w.IsRunning() {
t.Error("Watcher stopped")
}

_ = w.Stop()
<-done
}
19 changes: 6 additions & 13 deletions internal/watcher/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,12 @@ func (p *PodWatcher) IsRunning() bool {
func (p *PodWatcher) Start(ctx context.Context) error {
var err error
logrus.Infof("Starting pod watcher in namespace %s", p.Namespace)

watchTimeout := int64((24 * time.Hour).Seconds())
w, err := p.Watch(ctx, metav1.ListOptions{
Watch: true,
LabelSelector: p.LabelSelector,
Watch: true,
LabelSelector: p.LabelSelector,
TimeoutSeconds: &watchTimeout,
})
if err != nil {
return err
Expand All @@ -116,20 +119,10 @@ func (p *PodWatcher) Start(ctx context.Context) error {

for p.IsRunning() {
select {
case evt, ok := <-w.ResultChan():
case evt := <-w.ResultChan():
logrus.Debugf("Pod Watcher received event: %s", evt.Type)
if evt.Object == nil {
logrus.Warnf("Received evt with nil object: %+v", evt)
timer.Reset(p.getTimeout())
continue
}

pod := evt.Object.(*apicorev1.Pod)

if !ok {
return errors.New("Pod Watcher channel closed")
}

switch evt.Type {
case "", watch.Error:
logrus.Errorf("Received empty event or error from pod watcher: %+v", evt)
Expand Down
78 changes: 0 additions & 78 deletions internal/watcher/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,37 +154,6 @@ func TestPodWatcher_BasicBehaviour(t *gtest.T) {
<-done
}

func TestPodWatcher_Error(t *gtest.T) {
logrus.SetLevel(logrus.DebugLevel)
clientset := fake.NewSimpleClientset()
recorder := record.NewFakeRecorder(1024)
p := watcher.NewPodWatcher(clientset, recorder, "test", "app=name").(*watcher.PodWatcher)
p.Timeout = 1 * time.Second

done := make(chan interface{})
defer close(done)

clientset.PrependWatchReactor("pods", func(action ktest.Action) (handled bool, ret watch.Interface, err error) {
fakeWatch := watch.NewFake()
go func() {
fakeWatch.Error(createPod("test"))
}()

return true, fakeWatch, nil
})

// Start the watcher in background, it should fail
go func() {
if err := p.Start(context.Background()); err == nil {
t.Error("Was expecting an error")
}

done <- nil
}()

<-done
}

func TestPodWatcher_DeletePods(t *gtest.T) {
logrus.SetLevel(logrus.DebugLevel)
clientset := fake.NewSimpleClientset()
Expand Down Expand Up @@ -340,50 +309,3 @@ func TestPodWatcher_NotEnabled(t *gtest.T) {
cancel()
<-done
}

func TestPodWatcher_NilObject(t *gtest.T) {
logrus.SetLevel(logrus.DebugLevel)
clientset := fake.NewSimpleClientset()
recorder := record.NewFakeRecorder(1024)
p := watcher.NewPodWatcher(clientset, recorder, "test", "app=name").(*watcher.PodWatcher)
fakeWatch := watch.NewFake()

p.SetTimeout(100 * time.Millisecond)
p.SetEnabled(true)

done := make(chan interface{})

// Create a cancellable context
ctx, cancel := context.WithCancel(context.Background())

clientset.PrependWatchReactor("pods", func(action ktest.Action) (handled bool, ret watch.Interface, err error) {
go func() {
// Add a bunch of pods at regular intervals
for range [10]int{} {
fakeWatch.Add(nil)
time.Sleep(50 * time.Millisecond)
}
}()
return true, fakeWatch, nil
})

// Start watcher in background
go func() {
if err := p.Start(ctx); err != nil {
t.Error(err)
}

done <- nil
}()

// Wait for the events to be processed
time.Sleep(1 * time.Second)

// Despite the nil event, the watcher is still running
if !p.IsRunning() {
t.Error("Watcher stopped")
}

cancel()
<-done
}

0 comments on commit 965892b

Please sign in to comment.