diff --git a/manager/config/config.go b/manager/config/config.go index 7085076976e..329051f6f45 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -283,11 +283,17 @@ type GRPCConfig struct { } type TCPListenPortRange struct { + // Start is the start port. Start int - End int + + // End is the end port. + End int } type JobConfig struct { + // RateLimit configuration. + RateLimit RateLimitConfig `yaml:"rateLimit" mapstructure:"rateLimit"` + // Preheat configuration. Preheat PreheatConfig `yaml:"preheat" mapstructure:"preheat"` @@ -303,6 +309,18 @@ type PreheatConfig struct { TLS *PreheatTLSClientConfig `yaml:"tls" mapstructure:"tls"` } +// RateLimitConfig is the configuration for rate limit. +type RateLimitConfig struct { + // FillInterval is the interval between each token added to the bucket. + FillInterval time.Duration `yaml:"fillInterval" mapstructure:"fillInterval"` + + // Capacity is the maximum number of tokens in the bucket. + Capacity int64 `yaml:"capacity" mapstructure:"capacity"` + + // Quantum is the number of tokens taken from the bucket for each request. + Quantum int64 `yaml:"quantum" mapstructure:"quantum"` +} + type SyncPeersConfig struct { // Interval is the interval for syncing all peers information from the scheduler and // display peers information in the manager console. @@ -437,6 +455,11 @@ func New() *Config { }, }, Job: JobConfig{ + RateLimit: RateLimitConfig{ + FillInterval: DefaultJobRateLimitFillInterval, + Capacity: DefaultJobRateLimitCapacity, + Quantum: DefaultJobRateLimitQuantum, + }, Preheat: PreheatConfig{ RegistryTimeout: DefaultJobPreheatRegistryTimeout, }, @@ -606,6 +629,18 @@ func (cfg *Config) Validate() error { return errors.New("local requires parameter ttl") } + if cfg.Job.RateLimit.FillInterval == 0 { + return errors.New("rateLimit requires parameter fillInterval") + } + + if cfg.Job.RateLimit.Capacity == 0 { + return errors.New("rateLimit requires parameter capacity") + } + + if cfg.Job.RateLimit.Quantum == 0 { + return errors.New("rateLimit requires parameter quantum") + } + if cfg.Job.Preheat.TLS != nil { if cfg.Job.Preheat.TLS.CACert == "" { return errors.New("preheat requires parameter caCert") diff --git a/manager/config/config_test.go b/manager/config/config_test.go index dd80e914091..93c6d75661b 100644 --- a/manager/config/config_test.go +++ b/manager/config/config_test.go @@ -187,6 +187,11 @@ func TestConfig_Load(t *testing.T) { }, }, Job: JobConfig{ + RateLimit: RateLimitConfig{ + FillInterval: 1 * time.Second, + Capacity: 1000, + Quantum: 1000, + }, Preheat: PreheatConfig{ RegistryTimeout: DefaultJobPreheatRegistryTimeout, TLS: &PreheatTLSClientConfig{ @@ -697,6 +702,51 @@ func TestConfig_Validate(t *testing.T) { assert.EqualError(err, "local requires parameter ttl") }, }, + { + name: "rateLimit requires parameter fillInterval", + config: New(), + mock: func(cfg *Config) { + cfg.Auth.JWT = mockJWTConfig + cfg.Database.Type = DatabaseTypeMysql + cfg.Database.Mysql = mockMysqlConfig + cfg.Database.Redis = mockRedisConfig + cfg.Job.RateLimit.FillInterval = 0 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rateLimit requires parameter fillInterval") + }, + }, + { + name: "rateLimit requires parameter capacity", + config: New(), + mock: func(cfg *Config) { + cfg.Auth.JWT = mockJWTConfig + cfg.Database.Type = DatabaseTypeMysql + cfg.Database.Mysql = mockMysqlConfig + cfg.Database.Redis = mockRedisConfig + cfg.Job.RateLimit.Capacity = 0 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rateLimit requires parameter capacity") + }, + }, + { + name: "rateLimit requires parameter quantum", + config: New(), + mock: func(cfg *Config) { + cfg.Auth.JWT = mockJWTConfig + cfg.Database.Type = DatabaseTypeMysql + cfg.Database.Mysql = mockMysqlConfig + cfg.Database.Redis = mockRedisConfig + cfg.Job.RateLimit.Quantum = 0 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rateLimit requires parameter quantum") + }, + }, { name: "preheat requires parameter caCert", config: New(), diff --git a/manager/config/constants.go b/manager/config/constants.go index 55976d6a4b4..d1238615722 100644 --- a/manager/config/constants.go +++ b/manager/config/constants.go @@ -87,6 +87,15 @@ const ( ) const ( + // DefaultJobRateLimitFillInterval is the default fill interval for job rate limit. + DefaultJobRateLimitFillInterval = 1 * time.Minute + + // DefaultJobRateLimitCapacity is the default capacity for job rate limit. + DefaultJobRateLimitCapacity = 100 + + // DefaultJobRateLimitQuantum is the default quantum for job rate limit. + DefaultJobRateLimitQuantum = 100 + // DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest. DefaultJobPreheatRegistryTimeout = 1 * time.Minute diff --git a/manager/config/testdata/manager.yaml b/manager/config/testdata/manager.yaml index 40a56e454cd..4a686be8b1c 100644 --- a/manager/config/testdata/manager.yaml +++ b/manager/config/testdata/manager.yaml @@ -66,6 +66,10 @@ cache: ttl: 1s job: + rateLimit: + fillInterval: 1s + capacity: 1000 + quantum: 1000 preheat: registryTimeout: 1m tls: diff --git a/manager/console b/manager/console index 9b388891697..dec1ab4ba2d 160000 --- a/manager/console +++ b/manager/console @@ -1 +1 @@ -Subproject commit 9b3888916971368ddfd692d5f0427e919df670d4 +Subproject commit dec1ab4ba2dcc0892a959651120b6debbfef6354 diff --git a/manager/middlewares/rate_limit.go b/manager/middlewares/rate_limit.go new file mode 100644 index 00000000000..0abf73a6150 --- /dev/null +++ b/manager/middlewares/rate_limit.go @@ -0,0 +1,39 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package middlewares + +import ( + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/juju/ratelimit" +) + +func RateLimit(fillInterval time.Duration, capacity, quantum int64) gin.HandlerFunc { + bucket := ratelimit.NewBucketWithQuantum(fillInterval, capacity, quantum) + + return func(c *gin.Context) { + if bucket.TakeAvailable(1) < 1 { + c.String(http.StatusTooManyRequests, "rate limit exceeded") + c.Abort() + return + } + + c.Next() + } +}