Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor operator in preparation for leader-election #6

Merged
merged 8 commits into from
Sep 7, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions cmd/transflect-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,9 @@ func newOperator(cfg *config) (*operator, error) {
return nil, err
}

informerFactory := informers.NewSharedInformerFactory(k8s, time.Second*30)

rsInformer := informerFactory.Apps().V1().ReplicaSets()
if err := rsInformer.Informer().SetWatchErrorHandler(watchErrorHandler); err != nil {
return nil, errors.Wrap(err, "cannot add custom error handler to replicaset informer")
}
deployInformer := informerFactory.Apps().V1().Deployments()
if err := deployInformer.Informer().SetWatchErrorHandler(watchErrorHandler); err != nil {
return nil, errors.Wrap(err, "cannot add custom error handler to deployment informer")
}

op := &operator{
k8s: k8s,
istio: istio,
rsInformer: rsInformer,
deployInformer: deployInformer,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
stopper: make(chan struct{}),
k8s: k8s,
istio: istio,

useIngress: cfg.UseIngress,
plaintext: cfg.Plaintext,
Expand All @@ -96,20 +81,36 @@ func watchErrorHandler(_ *cache.Reflector, err error) {
}

func (o *operator) start() error {
// Initialise activeState for existing transflect EnvoyFilters
// to determine if EnvoyFilter upsert should be processed.
if err := o.syncActive(); err != nil {
return fmt.Errorf("cannot start operator: %w", err)
}

// Initialise work-queue and stopper channel
o.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
o.stopper = make(chan struct{})

// Initialise informers
informerFactory := informers.NewSharedInformerFactory(o.k8s, time.Second*30)
o.rsInformer = informerFactory.Apps().V1().ReplicaSets()
if err := o.rsInformer.Informer().SetWatchErrorHandler(watchErrorHandler); err != nil {
return errors.Wrap(err, "cannot add custom error handler to replicaset informer")
}
o.rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.enqueue,
UpdateFunc: o.enqueueWithOld,
})

o.deployInformer = informerFactory.Apps().V1().Deployments()
if err := o.deployInformer.Informer().SetWatchErrorHandler(watchErrorHandler); err != nil {
return errors.Wrap(err, "cannot add custom error handler to deployment informer")
}
o.deployInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: o.cleanupDeployment,
})
log.Debug().Msg("Start leading: informer event handlers registered")

log.Debug().Msg("Starting: informer event handlers registered")
// Run workers and informers
o.runWorkers(42)
o.rsInformer.Informer().Run(o.stopper)
o.deployInformer.Informer().Run(o.stopper)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two calls to Run() are not done asynchronously. From the Run docs:

Run starts and runs the shared informer, returning after it stops.

So the deployInformer does not start until the rsInformer stops.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yesss!! thanks for the eagle eye yet again.

Expand Down