Skip to content

Commit

Permalink
feat: restart pod watcher when timeout expires
Browse files Browse the repository at this point in the history
  • Loading branch information
massix authored and Massimo Gengarelli committed Jul 1, 2024
1 parent 7d150f7 commit 82c19fd
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 5 deletions.
51 changes: 46 additions & 5 deletions internal/watcher/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type PodWatcher struct {
LabelSelector string
PodList []*apicorev1.Pod
Timeout time.Duration
WatchTimeout time.Duration
ForceStopChan chan interface{}
Enabled bool
Running bool
}
Expand Down Expand Up @@ -55,6 +57,8 @@ func NewPodWatcher(clientset kubernetes.Interface, recorder record.EventRecorder
LabelSelector: strings.Join(labelSelector, ","),
PodList: []*apicorev1.Pod{},
Timeout: 30 * time.Second,
WatchTimeout: 45 * time.Minute,
ForceStopChan: make(chan interface{}),
Enabled: true,
Running: false,
}
Expand Down Expand Up @@ -100,7 +104,7 @@ func (p *PodWatcher) Start(ctx context.Context) error {
var err error
logrus.Infof("Starting pod watcher in namespace %s", p.Namespace)

watchTimeout := int64((24 * time.Hour).Seconds())
watchTimeout := int64(p.WatchTimeout.Seconds())
w, err := p.Watch(ctx, metav1.ListOptions{
Watch: true,
LabelSelector: p.LabelSelector,
Expand All @@ -119,15 +123,25 @@ func (p *PodWatcher) Start(ctx context.Context) error {

for p.IsRunning() {
select {
case evt := <-w.ResultChan():
case evt, ok := <-w.ResultChan():
if !ok {
logrus.Warn("Watcher timeout")
if w, err = p.resetWatcher(ctx); err != nil {
logrus.Error(err)
p.setRunning(false)
}

break
}

logrus.Debugf("Pod Watcher received event: %s", evt.Type)
pod := evt.Object.(*apicorev1.Pod)

switch evt.Type {
case "", watch.Error:
case watch.Error:
logrus.Errorf("Received empty event or error from pod watcher: %+v", evt)
err = errors.New("Empty event or error from pod watcher")
_ = p.Stop()
p.setRunning(false)
case watch.Added:
logrus.Infof("Adding pod to list: %s", pod.Name)
p.addPodToList(pod)
Expand All @@ -141,7 +155,7 @@ func (p *PodWatcher) Start(ctx context.Context) error {
}
case <-ctx.Done():
logrus.Info("Pod Watcher context done")
err = p.Stop()
p.setRunning(false)
case <-timer.C:
if !p.isEnabled() {
logrus.Debug("CRD not enabled, refusing to disrupt pods")
Expand Down Expand Up @@ -174,6 +188,8 @@ func (p *PodWatcher) Start(ctx context.Context) error {
p.Eventf(pod, apicorev1.EventTypeNormal, "ChaosMonkeyTarget", eventFormat, p.Timeout)
}
p.Mutex.Unlock()
case <-p.ForceStopChan:
logrus.Info("Force stopped watcher")
}

timer.Reset(p.getTimeout())
Expand All @@ -191,6 +207,13 @@ func (p *PodWatcher) Stop() error {
logrus.Infof("Stopping pod watcher in namespace %s", p.Namespace)

p.Running = false

select {
case p.ForceStopChan <- nil:
default:
logrus.Warn("Could not write in channel")
}

return nil
}

Expand Down Expand Up @@ -245,3 +268,21 @@ func (p *PodWatcher) getRandomPod() (*apicorev1.Pod, error) {

return p.PodList[rand.Intn(len(p.PodList))], nil
}

func (p *PodWatcher) resetWatcher(ctx context.Context) (watch.Interface, error) {
p.Mutex.Lock()
defer p.Mutex.Unlock()

logrus.Infof("Resetting pod watcher for %s", p.LabelSelector)

// Remove all the recorded pods
p.PodList = []*apicorev1.Pod{}
logrus.Debugf("Cleaned pod list for %s", p.LabelSelector)

timeoutSeconds := int64(p.WatchTimeout.Seconds())
return p.Watch(ctx, metav1.ListOptions{
Watch: true,
LabelSelector: p.LabelSelector,
TimeoutSeconds: &timeoutSeconds,
})
}
59 changes: 59 additions & 0 deletions internal/watcher/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package watcher_test
import (
"context"
"fmt"
"sync/atomic"
gtest "testing"
"time"

Expand Down Expand Up @@ -309,3 +310,61 @@ func TestPodWatcher_NotEnabled(t *gtest.T) {
cancel()
<-done
}

func TestPodWatcher_Restart(t *gtest.T) {
logrus.SetLevel(logrus.DebugLevel)
clientset := fake.NewSimpleClientset()
recorder := record.NewFakeRecorder(1024)
p := watcher.NewPodWatcher(clientset, recorder, "test", "app=name").(*watcher.PodWatcher)

p.SetTimeout(5 * time.Hour)
p.SetEnabled(true)

done := make(chan interface{})

timesCalled := &atomic.Int32{}
timesCalled.Store(0)

clientset.PrependWatchReactor("pods", func(action ktest.Action) (handled bool, ret watch.Interface, err error) {
timesCalled.Add(1)
fakeWatch := watch.NewFake()
go func() {
// Add a bunch of pods at regular intervals
for i := range [10]int{} {
fakeWatch.Add(createPod(fmt.Sprintf("test-%d", i+1)))
time.Sleep(100 * time.Millisecond)
}

fakeWatch.Stop()
}()
return true, fakeWatch, nil
})

go func() {
if err := p.Start(context.Background()); err != nil {
t.Error(err)
}

done <- nil
}()

// Wait for the events to be processed
time.Sleep(3000 * time.Millisecond)

// At this point, the watcher should have 10 pods in the list
p.Mutex.Lock()
if cnt := len(p.PodList); cnt != 10 {
t.Errorf("Was expecting 10 pods in the list, got %d instead", cnt)
}
p.Mutex.Unlock()

// And the watch should have been called 3 times
if cnt := timesCalled.Load(); cnt != 3 {
t.Errorf("Was expecting 4 watch calls, got %d instead", cnt)
}

// We can now stop the watch
_ = p.Stop()

<-done
}

0 comments on commit 82c19fd

Please sign in to comment.