Skip to content

Commit

Permalink
style: change logs for PodWatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Massimo Gengarelli committed Jul 1, 2024
1 parent 0f3068b commit a2e8f9a
Showing 1 changed file with 33 additions and 29 deletions.
62 changes: 33 additions & 29 deletions internal/watcher/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type PodWatcher struct {
corev1.PodInterface
record.EventRecorderLogger

Logrus logrus.FieldLogger
Mutex *sync.Mutex
Namespace string
LabelSelector string
Expand All @@ -48,13 +49,16 @@ func NewPodWatcher(clientset kubernetes.Interface, recorder record.EventRecorder
recorder = bc.NewRecorder(scheme.Scheme, apicorev1.EventSource{Component: "chaos-monkey"})
}

combinedSelector := strings.Join(labelSelector, ",")

return &PodWatcher{
PodInterface: clientset.CoreV1().Pods(namespace),
EventRecorderLogger: recorder,

Logrus: logrus.WithFields(logrus.Fields{"component": "PodWatcher", "namespace": namespace, "labelSelector": labelSelector}),
Mutex: &sync.Mutex{},
Namespace: namespace,
LabelSelector: strings.Join(labelSelector, ","),
LabelSelector: combinedSelector,
PodList: []*apicorev1.Pod{},
Timeout: 30 * time.Second,
WatchTimeout: 15 * time.Minute,
Expand All @@ -69,7 +73,7 @@ func (p *PodWatcher) SetTimeout(v time.Duration) {
p.Mutex.Lock()
defer p.Mutex.Unlock()

logrus.Debugf("Setting new timeout: %s, old timeout: %s", v, p.Timeout)
p.Logrus.Debugf("Setting new timeout: %s, old timeout: %s", v, p.Timeout)
p.Timeout = v
}

Expand Down Expand Up @@ -102,7 +106,7 @@ func (p *PodWatcher) IsRunning() bool {
// Start implements Watcher.
func (p *PodWatcher) Start(ctx context.Context) error {
var err error
logrus.Infof("Starting pod watcher in namespace %s", p.Namespace)
p.Logrus.Info("Starting pod watcher")

watchTimeout := int64(p.WatchTimeout.Seconds())
w, err := p.Watch(ctx, metav1.ListOptions{
Expand All @@ -125,60 +129,60 @@ func (p *PodWatcher) Start(ctx context.Context) error {
select {
case evt, ok := <-w.ResultChan():
if !ok {
logrus.Warn("Watcher timeout")
p.Logrus.Warn("Watcher timeout")
if w, err = p.restartWatch(ctx); err != nil {
logrus.Error(err)
p.Logrus.Error(err)
p.setRunning(false)
}

break
}

logrus.Debugf("Pod Watcher received event: %s", evt.Type)
p.Logrus.Debugf("Pod Watcher received event: %s", evt.Type)
pod := evt.Object.(*apicorev1.Pod)

switch evt.Type {
case watch.Error:
logrus.Errorf("Received empty event or error from pod watcher: %+v", evt)
p.Logrus.Errorf("Received empty event or error from pod watcher: %+v", evt)
err = errors.New("Empty event or error from pod watcher")
p.setRunning(false)
case watch.Added:
logrus.Infof("Adding pod to list: %s", pod.Name)
p.Logrus.Infof("Adding pod to list: %s", pod.Name)
p.addPodToList(pod)
p.Eventf(pod, apicorev1.EventTypeNormal, "ChaosMonkeyTarget", eventFormat, p.getTimeout())
logrus.Debug("Pod added, event sent!")
p.Logrus.Debug("Pod added, event sent!")
case watch.Deleted:
logrus.Infof("Removing pod from list: %s", pod.Name)
p.Logrus.Infof("Removing pod from list: %s", pod.Name)
p.removePodFromList(pod)
case watch.Modified:
logrus.Debugf("Ignoring modification of pod %s", pod.Name)
p.Logrus.Debugf("Ignoring modification of pod %s", pod.Name)
}
case <-ctx.Done():
logrus.Info("Pod Watcher context done")
p.Logrus.Info("Pod Watcher context done")
p.setRunning(false)
case <-timer.C:
if !p.isEnabled() {
logrus.Debug("CRD not enabled, refusing to disrupt pods")
p.Logrus.Debug("CRD not enabled, refusing to disrupt pods")
timer.Reset(p.getTimeout())
continue
}

logrus.Infof("Disrupting random pod from namespace %s", p.Namespace)
p.Logrus.Info("Disrupting random pod")
if randomPod, err := p.getRandomPod(); err != nil {
logrus.Warnf("Warning: %s", err)
p.Logrus.Warnf("Warning: %s", err)
} else {
logrus.Infof("Disrupting pod %s", randomPod.Name)
p.Logrus.Infof("Disrupting pod %s", randomPod.Name)
gracePeriod := int64(0)
if err := p.Delete(ctx, randomPod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}); err != nil {
logrus.Errorf("Could not disrupt pod: %s", err)
p.Logrus.Errorf("Could not disrupt pod: %s", err)
} else {
p.Event(
randomPod,
apicorev1.EventTypeNormal,
"ChaosMonkeyTarget",
"Pod got killed by Chaos Monkey",
)
logrus.Debug("Pod disrupted, event sent!")
p.Logrus.Debug("Pod disrupted, event sent!")
}
}

Expand All @@ -189,13 +193,13 @@ func (p *PodWatcher) Start(ctx context.Context) error {
}
p.Mutex.Unlock()
case <-p.ForceStopChan:
logrus.Info("Force stopped watcher")
p.Logrus.Info("Force stopped watcher")
}

timer.Reset(p.getTimeout())
}

logrus.Infof("Pod watcher in namespace %s finished", p.Namespace)
p.Logrus.Info("Pod watcher finished")
return err
}

Expand All @@ -204,14 +208,14 @@ func (p *PodWatcher) Stop() error {
p.Mutex.Lock()
defer p.Mutex.Unlock()

logrus.Infof("Stopping pod watcher in namespace %s", p.Namespace)
p.Logrus.Infof("Stopping pod watcher")

p.Running = false

select {
case p.ForceStopChan <- nil:
default:
logrus.Warn("Could not write in channel")
p.Logrus.Warn("Could not write in channel")
}

return nil
Expand Down Expand Up @@ -242,21 +246,21 @@ func (p *PodWatcher) addPodToList(pod *apicorev1.Pod) {
p.Mutex.Lock()
defer p.Mutex.Unlock()

logrus.Debugf("Current pod list size: %d", len(p.PodList))
p.Logrus.Debugf("Current pod list size: %d", len(p.PodList))
p.PodList = append(p.PodList, pod)
logrus.Debugf("Final pod list size: %d", len(p.PodList))
p.Logrus.Debugf("Final pod list size: %d", len(p.PodList))
}

func (p *PodWatcher) removePodFromList(pod *apicorev1.Pod) {
p.Mutex.Lock()
defer p.Mutex.Unlock()

logrus.Debugf("Current pod list size: %d", len(p.PodList))
p.Logrus.Debugf("Current pod list size: %d", len(p.PodList))
p.PodList = slices.DeleteFunc(p.PodList, func(v *apicorev1.Pod) bool {
logrus.Debugf("Checking pod %s against %s: %+v", v.Name, pod.Name, v.Name == pod.Name)
p.Logrus.Debugf("Checking pod %s against %s: %+v", v.Name, pod.Name, v.Name == pod.Name)
return v.Name == pod.Name
})
logrus.Debugf("Final pod list size: %d", len(p.PodList))
p.Logrus.Debugf("Final pod list size: %d", len(p.PodList))
}

func (p *PodWatcher) getRandomPod() (*apicorev1.Pod, error) {
Expand All @@ -273,11 +277,11 @@ func (p *PodWatcher) restartWatch(ctx context.Context) (watch.Interface, error)
p.Mutex.Lock()
defer p.Mutex.Unlock()

logrus.Infof("Resetting pod watcher for %s", p.LabelSelector)
p.Logrus.Info("Resetting pod watcher")

// Remove all the recorded pods
p.PodList = []*apicorev1.Pod{}
logrus.Debugf("Cleaned pod list for %s", p.LabelSelector)
p.Logrus.Debug("Cleaned pod list")

timeoutSeconds := int64(p.WatchTimeout.Seconds())
return p.Watch(ctx, metav1.ListOptions{
Expand Down

0 comments on commit a2e8f9a

Please sign in to comment.