diff --git a/webhosting-operator/pkg/experiment/generator/reconciler.go b/webhosting-operator/pkg/experiment/generator/reconciler.go index bd187ceb..da911745 100644 --- a/webhosting-operator/pkg/experiment/generator/reconciler.go +++ b/webhosting-operator/pkg/experiment/generator/reconciler.go @@ -37,16 +37,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -const reconcileWorkers = 10 +const defaultReconcileWorkers = 10 // Every runs the given Func with the specified frequency. type Every struct { client.Client - Name string - Do func(ctx context.Context, c client.Client) error - Rate rate.Limit - Stop time.Time + Name string + Do func(ctx context.Context, c client.Client) error + Rate rate.Limit + Stop time.Time + Workers int } func (r *Every) AddToManager(mgr manager.Manager) error { @@ -54,13 +55,18 @@ func (r *Every) AddToManager(mgr manager.Manager) error { r.Client = mgr.GetClient() } + workers := defaultReconcileWorkers + if r.Workers > 0 { + workers = r.Workers + } + return builder.ControllerManagedBy(mgr). Named(r.Name). WithOptions(controller.Options{ - MaxConcurrentReconciles: reconcileWorkers, + MaxConcurrentReconciles: workers, RateLimiter: &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(r.Rate, int(r.Rate))}, }). - WatchesRawSource(EmitN(reconcileWorkers), &handler.EnqueueRequestForObject{}). + WatchesRawSource(EmitN(workers), &handler.EnqueueRequestForObject{}). Complete(StopOnContextCanceled(r)) } @@ -77,10 +83,11 @@ func (r *Every) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R type ForEach[T client.Object] struct { client.Client - Name string - Do func(ctx context.Context, c client.Client, obj T) error - Every time.Duration - Stop time.Time + Name string + Do func(ctx context.Context, c client.Client, obj T) error + Every time.Duration + Stop time.Time + Workers int gvk schema.GroupVersionKind obj T @@ -100,10 +107,15 @@ func (r *ForEach[T]) AddToManager(mgr manager.Manager) error { return err } + workers := defaultReconcileWorkers + if r.Workers > 0 { + workers = r.Workers + } + return builder.ControllerManagedBy(mgr). Named(r.Name). WithOptions(controller.Options{ - MaxConcurrentReconciles: reconcileWorkers, + MaxConcurrentReconciles: workers, RateLimiter: unlimitedRateLimiter(), }). Watches( diff --git a/webhosting-operator/pkg/experiment/scenario/reconcile/reconcile.go b/webhosting-operator/pkg/experiment/scenario/reconcile/reconcile.go index 2cb2aee8..c6ec63e1 100644 --- a/webhosting-operator/pkg/experiment/scenario/reconcile/reconcile.go +++ b/webhosting-operator/pkg/experiment/scenario/reconcile/reconcile.go @@ -74,13 +74,14 @@ func (s *scenario) Prepare(ctx context.Context) error { } func (s *scenario) Run(ctx context.Context) error { - // trigger individual reconciliations for website every 10s + // trigger individual reconciliations for websites every 10s if err := (&generator.ForEach[*webhostingv1alpha1.Website]{ Name: "website-reconcile-trigger", Do: func(ctx context.Context, c client.Client, obj *webhostingv1alpha1.Website) error { return client.IgnoreNotFound(generator.ReconcileWebsite(ctx, c, obj)) }, - Every: 10 * time.Second, + Every: 10 * time.Second, + Workers: 50, }).AddToManager(s.Manager); err != nil { return fmt.Errorf("error adding website-reconcile-trigger: %w", err) }