Skip to content

Commit

Permalink
feat(restart): implement restart strategy for CRD Watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Massimo Gengarelli committed Jun 30, 2024
1 parent b0aa7ca commit 19ebf40
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 15 deletions.
83 changes: 69 additions & 14 deletions internal/watcher/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ type CrdWatcher struct {
Client kubernetes.Interface
Mutex *sync.Mutex
DeploymentWatchers map[string]ConfigurableWatcher
ForceStopChan chan interface{}
Namespace string
Running bool
CleanupTimeout time.Duration
WatcherTimeout time.Duration
Running bool
}

var _ = (Watcher)((*CrdWatcher)(nil))
Expand All @@ -49,15 +51,17 @@ func NewCrdWatcher(clientset kubernetes.Interface, cmcClientset typedcmc.Interfa

return &CrdWatcher{
ChaosMonkeyConfigurationInterface: cmcClientset.ChaosMonkeyConfigurationV1alpha1().ChaosMonkeyConfigurations(namespace),

DeploymentInterface: clientset.AppsV1().Deployments(namespace),
EventRecorderLogger: recorder,
Mutex: &sync.Mutex{},
Namespace: namespace,
Running: false,
DeploymentWatchers: map[string]ConfigurableWatcher{},
Client: clientset,
CleanupTimeout: 1 * time.Minute,
DeploymentInterface: clientset.AppsV1().Deployments(namespace),
EventRecorderLogger: recorder,

Client: clientset,
Mutex: &sync.Mutex{},
DeploymentWatchers: map[string]ConfigurableWatcher{},
ForceStopChan: make(chan interface{}),
Namespace: namespace,
CleanupTimeout: 15 * time.Minute,
WatcherTimeout: 24 * time.Hour,
Running: false,
}
}

Expand All @@ -75,7 +79,7 @@ func (c *CrdWatcher) Start(ctx context.Context) error {
var err error
var wg sync.WaitGroup

watchTimeout := int64((24 * time.Hour).Seconds())
watchTimeout := int64(c.WatcherTimeout.Seconds())
w, err := c.ChaosMonkeyConfigurationInterface.Watch(ctx, metav1.ListOptions{
Watch: true,
TimeoutSeconds: &watchTimeout,
Expand All @@ -91,12 +95,22 @@ func (c *CrdWatcher) Start(ctx context.Context) error {
for c.IsRunning() {
select {
case evt := <-w.ResultChan():
cmc := evt.Object.(*v1alpha1.ChaosMonkeyConfiguration)
cmc, ok := evt.Object.(*v1alpha1.ChaosMonkeyConfiguration)
if !ok {
logrus.Warnf("Watch for %s timed out", c.Namespace)
w, err = c.restartWatchers(ctx, &wg)
if err != nil {
logrus.Errorf("Error while restarting watchers: %s", err)
c.setRunning(false)
}

break
}

switch evt.Type {
case "", watch.Error:
logrus.Errorf("Received empty error or event from CRD watcher: %+v", evt)
_ = c.Stop()
c.setRunning(false)
err = errors.New("Empty event or error from CRD watcher")

case watch.Added:
Expand Down Expand Up @@ -145,13 +159,19 @@ func (c *CrdWatcher) Start(ctx context.Context) error {
logrus.Debug("All is good! Publishing event.")
c.EventRecorderLogger.Eventf(cmc, "Normal", "Deleted", "Watcher deleted for deployment %s", cmc.Spec.DeploymentName)
}

case <-ctx.Done():
logrus.Infof("Watcher context done")
_ = c.Stop()
c.setRunning(false)

case <-time.After(c.CleanupTimeout):
logrus.Debug("Garbage collecting Chaos Monkeys")
c.cleanUp()

case <-c.ForceStopChan:
// This is here just to wake up early from the loop
logrus.Info("Force stopping CRD Watcher")
c.setRunning(false)
}
}

Expand Down Expand Up @@ -180,6 +200,15 @@ func (c *CrdWatcher) Stop() error {
c.Running = false

logrus.Debugf("Stopping CRD watcher for %s", c.Namespace)
logrus.Debug("Force stopping")

select {
case c.ForceStopChan <- nil:
default:
logrus.Warn("Could not write to ForceStopChannel")
}

close(c.ForceStopChan)
return nil
}

Expand Down Expand Up @@ -313,3 +342,29 @@ func (c *CrdWatcher) cleanUp() {
}
}
}

func (c *CrdWatcher) restartWatchers(ctx context.Context, wg *sync.WaitGroup) (watch.Interface, error) {
c.Mutex.Lock()
defer c.Mutex.Unlock()

logrus.Infof("Restarting CRD Watcher for %s", c.Namespace)

logrus.Debug("Cleaning existing watchers")
for key, w := range c.DeploymentWatchers {
logrus.Debugf("Stopping watcher for %s", key)
if err := w.Stop(); err != nil {
logrus.Warnf("Error while stopping watcher for %s: %s", key, err)
}

delete(c.DeploymentWatchers, key)
}

logrus.Info("Waiting for monkeys to get back home")
wg.Wait()

timeoutSeconds := int64(c.WatcherTimeout.Seconds())
return c.ChaosMonkeyConfigurationInterface.Watch(ctx, metav1.ListOptions{
Watch: true,
TimeoutSeconds: &timeoutSeconds,
})
}
78 changes: 78 additions & 0 deletions internal/watcher/crd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package watcher_test

import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -339,3 +341,79 @@ func TestCRDWatcher_Cleanup(t *testing.T) {
t.Error("Watcher should be stopped")
}
}

func TestCRDWatcher_Restart(t *testing.T) {
clientSet := kubernetes.NewSimpleClientset()
cmClientset := cmc.NewSimpleClientset()
w := watcher.DefaultCrdFactory(clientSet, cmClientset, record.NewFakeRecorder(1024), "chaos-monkey").(*watcher.CrdWatcher)
w.CleanupTimeout = 1 * time.Second
timesRestarted := &atomic.Int32{}
timesRestarted.Store(0)

watcher.DefaultDeploymentFactory = func(clientset k.Interface, recorder record.EventRecorderLogger, deployment *appsv1.Deployment) watcher.ConfigurableWatcher {
return &FakeDeploymentWatcher{Mutex: &sync.Mutex{}}
}

// Setup the scenario for the CMCs
cmClientset.PrependWatchReactor("chaosmonkeyconfigurations", func(action ktest.Action) (handled bool, ret watch.Interface, err error) {
fakeWatch := watch.NewFake()
timesRestarted.Add(1)

go func() {
for i := range [10]int{} {
depName := fmt.Sprintf("test-%d", i)
fakeWatch.Add(&v1alpha1.ChaosMonkeyConfiguration{
ObjectMeta: metav1.ObjectMeta{Name: depName},
Spec: v1alpha1.ChaosMonkeyConfigurationSpec{
PodMode: false,
Timeout: "10s",
MinReplicas: 0,
MaxReplicas: 10,
Enabled: false,
DeploymentName: depName,
},
})
time.Sleep(100 * time.Millisecond)
}

fakeWatch.Stop()
}()
return true, fakeWatch, nil
})

clientSet.PrependReactor("get", "deployments", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
requestedName := action.(ktest.GetAction).GetName()
return true, &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: requestedName}}, nil
})

// Start the watcher in background
done := make(chan interface{})
defer close(done)

go func() {
if err := w.Start(context.Background()); err != nil {
t.Error(err)
}

done <- nil
}()

time.Sleep(4300 * time.Millisecond)

// It should still be running
w.Mutex.Lock()
if !w.Running {
t.Error("Watcher should be running")
}
w.Mutex.Unlock()

// Now stop it and verify that the watcher restarted 5 times
if err := w.Stop(); err != nil {
t.Error(err)
}
<-done

if timesRestarted.Load() != 5 {
t.Errorf("Expected 5 restarts, got %d", timesRestarted.Load())
}
}
2 changes: 1 addition & 1 deletion internal/watcher/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (p *PodWatcher) Start(ctx context.Context) error {
timer.Reset(p.getTimeout())
}

logrus.Infof("Stopping pod watcher in namespace %s", p.Namespace)
logrus.Infof("Pod watcher in namespace %s finished", p.Namespace)
return err
}

Expand Down

0 comments on commit 19ebf40

Please sign in to comment.