Skip to content

Commit

Permalink
Increase concurrency of reconcile scenario
Browse files Browse the repository at this point in the history
The scenario wasn't able to generate the desired load as the reconciler's throuput was limited by the number of concurrent workers
(all workers were active all the time and work items were piling up in the queue).
  • Loading branch information
timebertt committed Aug 23, 2023
1 parent 902d05a commit ca256de
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
36 changes: 24 additions & 12 deletions webhosting-operator/pkg/experiment/generator/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,36 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const reconcileWorkers = 10
const defaultReconcileWorkers = 10

// Every runs the given Func with the specified frequency.
type Every struct {
client.Client

Name string
Do func(ctx context.Context, c client.Client) error
Rate rate.Limit
Stop time.Time
Name string
Do func(ctx context.Context, c client.Client) error
Rate rate.Limit
Stop time.Time
Workers int
}

func (r *Every) AddToManager(mgr manager.Manager) error {
if r.Client == nil {
r.Client = mgr.GetClient()
}

workers := defaultReconcileWorkers
if r.Workers > 0 {
workers = r.Workers
}

return builder.ControllerManagedBy(mgr).
Named(r.Name).
WithOptions(controller.Options{
MaxConcurrentReconciles: reconcileWorkers,
MaxConcurrentReconciles: workers,
RateLimiter: &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(r.Rate, int(r.Rate))},
}).
WatchesRawSource(EmitN(reconcileWorkers), &handler.EnqueueRequestForObject{}).
WatchesRawSource(EmitN(workers), &handler.EnqueueRequestForObject{}).
Complete(StopOnContextCanceled(r))
}

Expand All @@ -77,10 +83,11 @@ func (r *Every) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R
type ForEach[T client.Object] struct {
client.Client

Name string
Do func(ctx context.Context, c client.Client, obj T) error
Every time.Duration
Stop time.Time
Name string
Do func(ctx context.Context, c client.Client, obj T) error
Every time.Duration
Stop time.Time
Workers int

gvk schema.GroupVersionKind
obj T
Expand All @@ -100,10 +107,15 @@ func (r *ForEach[T]) AddToManager(mgr manager.Manager) error {
return err
}

workers := defaultReconcileWorkers
if r.Workers > 0 {
workers = r.Workers
}

return builder.ControllerManagedBy(mgr).
Named(r.Name).
WithOptions(controller.Options{
MaxConcurrentReconciles: reconcileWorkers,
MaxConcurrentReconciles: workers,
RateLimiter: unlimitedRateLimiter(),
}).
Watches(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ func (s *scenario) Prepare(ctx context.Context) error {
}

func (s *scenario) Run(ctx context.Context) error {
// trigger individual reconciliations for website every 10s
// trigger individual reconciliations for websites every 10s
if err := (&generator.ForEach[*webhostingv1alpha1.Website]{
Name: "website-reconcile-trigger",
Do: func(ctx context.Context, c client.Client, obj *webhostingv1alpha1.Website) error {
return client.IgnoreNotFound(generator.ReconcileWebsite(ctx, c, obj))
},
Every: 10 * time.Second,
Every: 10 * time.Second,
Workers: 50,
}).AddToManager(s.Manager); err != nil {
return fmt.Errorf("error adding website-reconcile-trigger: %w", err)
}
Expand Down

0 comments on commit ca256de

Please sign in to comment.