From b9c0c37f01899a98a0d927e0fc9be3b7b95cf79b Mon Sep 17 00:00:00 2001 From: Tim Ebert Date: Wed, 23 Aug 2023 09:32:40 +0200 Subject: [PATCH] Increase concurrency of `reconcile` scenario The scenario wasn't able to generate the desired load as the reconciler's throuput was limited by the number of concurrent workers (all workers were active all the time and work items were piling up in the queue). --- .../pkg/experiment/generator/reconciler.go | 36 ++++++++++++------- .../scenario/reconcile/reconcile.go | 3 +- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/webhosting-operator/pkg/experiment/generator/reconciler.go b/webhosting-operator/pkg/experiment/generator/reconciler.go index bd187ceb..da911745 100644 --- a/webhosting-operator/pkg/experiment/generator/reconciler.go +++ b/webhosting-operator/pkg/experiment/generator/reconciler.go @@ -37,16 +37,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -const reconcileWorkers = 10 +const defaultReconcileWorkers = 10 // Every runs the given Func with the specified frequency. type Every struct { client.Client - Name string - Do func(ctx context.Context, c client.Client) error - Rate rate.Limit - Stop time.Time + Name string + Do func(ctx context.Context, c client.Client) error + Rate rate.Limit + Stop time.Time + Workers int } func (r *Every) AddToManager(mgr manager.Manager) error { @@ -54,13 +55,18 @@ func (r *Every) AddToManager(mgr manager.Manager) error { r.Client = mgr.GetClient() } + workers := defaultReconcileWorkers + if r.Workers > 0 { + workers = r.Workers + } + return builder.ControllerManagedBy(mgr). Named(r.Name). WithOptions(controller.Options{ - MaxConcurrentReconciles: reconcileWorkers, + MaxConcurrentReconciles: workers, RateLimiter: &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(r.Rate, int(r.Rate))}, }). - WatchesRawSource(EmitN(reconcileWorkers), &handler.EnqueueRequestForObject{}). + WatchesRawSource(EmitN(workers), &handler.EnqueueRequestForObject{}). Complete(StopOnContextCanceled(r)) } @@ -77,10 +83,11 @@ func (r *Every) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R type ForEach[T client.Object] struct { client.Client - Name string - Do func(ctx context.Context, c client.Client, obj T) error - Every time.Duration - Stop time.Time + Name string + Do func(ctx context.Context, c client.Client, obj T) error + Every time.Duration + Stop time.Time + Workers int gvk schema.GroupVersionKind obj T @@ -100,10 +107,15 @@ func (r *ForEach[T]) AddToManager(mgr manager.Manager) error { return err } + workers := defaultReconcileWorkers + if r.Workers > 0 { + workers = r.Workers + } + return builder.ControllerManagedBy(mgr). Named(r.Name). WithOptions(controller.Options{ - MaxConcurrentReconciles: reconcileWorkers, + MaxConcurrentReconciles: workers, RateLimiter: unlimitedRateLimiter(), }). Watches( diff --git a/webhosting-operator/pkg/experiment/scenario/reconcile/reconcile.go b/webhosting-operator/pkg/experiment/scenario/reconcile/reconcile.go index 8b8c813d..dfa729da 100644 --- a/webhosting-operator/pkg/experiment/scenario/reconcile/reconcile.go +++ b/webhosting-operator/pkg/experiment/scenario/reconcile/reconcile.go @@ -74,12 +74,13 @@ func (s *scenario) Prepare(ctx context.Context) error { } func (s *scenario) Run(ctx context.Context) error { - // trigger individual reconciliations for website every 10s + // trigger individual reconciliations for websites every 10s if err := (&generator.ForEach[*webhostingv1alpha1.Website]{ Name: "website-reconcile-trigger", Do: generator.ReconcileWebsite, Every: 10 * time.Second, RateLimit: rate.Limit(1000), + Workers: 50, }).AddToManager(s.Manager); err != nil { return fmt.Errorf("error adding website-reconcile-trigger: %w", err) }