Skip to content

Commit

Permalink
feat: add ratelimit for job in manager (#3480)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Sep 3, 2024
1 parent 2c0ae78 commit 75d6242
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 2 deletions.
37 changes: 36 additions & 1 deletion manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,17 @@ type GRPCConfig struct {
}

type TCPListenPortRange struct {
// Start is the start port.
Start int
End int

// End is the end port.
End int
}

type JobConfig struct {
// RateLimit configuration.
RateLimit RateLimitConfig `yaml:"rateLimit" mapstructure:"rateLimit"`

// Preheat configuration.
Preheat PreheatConfig `yaml:"preheat" mapstructure:"preheat"`

Expand All @@ -303,6 +309,18 @@ type PreheatConfig struct {
TLS *PreheatTLSClientConfig `yaml:"tls" mapstructure:"tls"`
}

// RateLimitConfig is the configuration for rate limit.
type RateLimitConfig struct {
// FillInterval is the interval between each token added to the bucket.
FillInterval time.Duration `yaml:"fillInterval" mapstructure:"fillInterval"`

// Capacity is the maximum number of tokens in the bucket.
Capacity int64 `yaml:"capacity" mapstructure:"capacity"`

// Quantum is the number of tokens taken from the bucket for each request.
Quantum int64 `yaml:"quantum" mapstructure:"quantum"`
}

type SyncPeersConfig struct {
// Interval is the interval for syncing all peers information from the scheduler and
// display peers information in the manager console.
Expand Down Expand Up @@ -437,6 +455,11 @@ func New() *Config {
},
},
Job: JobConfig{
RateLimit: RateLimitConfig{
FillInterval: DefaultJobRateLimitFillInterval,
Capacity: DefaultJobRateLimitCapacity,
Quantum: DefaultJobRateLimitQuantum,
},
Preheat: PreheatConfig{
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
},
Expand Down Expand Up @@ -606,6 +629,18 @@ func (cfg *Config) Validate() error {
return errors.New("local requires parameter ttl")
}

if cfg.Job.RateLimit.FillInterval == 0 {
return errors.New("rateLimit requires parameter fillInterval")
}

if cfg.Job.RateLimit.Capacity == 0 {
return errors.New("rateLimit requires parameter capacity")
}

if cfg.Job.RateLimit.Quantum == 0 {
return errors.New("rateLimit requires parameter quantum")
}

if cfg.Job.Preheat.TLS != nil {
if cfg.Job.Preheat.TLS.CACert == "" {
return errors.New("preheat requires parameter caCert")
Expand Down
50 changes: 50 additions & 0 deletions manager/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ func TestConfig_Load(t *testing.T) {
},
},
Job: JobConfig{
RateLimit: RateLimitConfig{
FillInterval: 1 * time.Second,
Capacity: 1000,
Quantum: 1000,
},
Preheat: PreheatConfig{
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
TLS: &PreheatTLSClientConfig{
Expand Down Expand Up @@ -697,6 +702,51 @@ func TestConfig_Validate(t *testing.T) {
assert.EqualError(err, "local requires parameter ttl")
},
},
{
name: "rateLimit requires parameter fillInterval",
config: New(),
mock: func(cfg *Config) {
cfg.Auth.JWT = mockJWTConfig
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.RateLimit.FillInterval = 0
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rateLimit requires parameter fillInterval")
},
},
{
name: "rateLimit requires parameter capacity",
config: New(),
mock: func(cfg *Config) {
cfg.Auth.JWT = mockJWTConfig
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.RateLimit.Capacity = 0
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rateLimit requires parameter capacity")
},
},
{
name: "rateLimit requires parameter quantum",
config: New(),
mock: func(cfg *Config) {
cfg.Auth.JWT = mockJWTConfig
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.RateLimit.Quantum = 0
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rateLimit requires parameter quantum")
},
},
{
name: "preheat requires parameter caCert",
config: New(),
Expand Down
9 changes: 9 additions & 0 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ const (
)

const (
// DefaultJobRateLimitFillInterval is the default fill interval for job rate limit.
DefaultJobRateLimitFillInterval = 1 * time.Minute

// DefaultJobRateLimitCapacity is the default capacity for job rate limit.
DefaultJobRateLimitCapacity = 100

// DefaultJobRateLimitQuantum is the default quantum for job rate limit.
DefaultJobRateLimitQuantum = 100

// DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest.
DefaultJobPreheatRegistryTimeout = 1 * time.Minute

Expand Down
4 changes: 4 additions & 0 deletions manager/config/testdata/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ cache:
ttl: 1s

job:
rateLimit:
fillInterval: 1s
capacity: 1000
quantum: 1000
preheat:
registryTimeout: 1m
tls:
Expand Down
39 changes: 39 additions & 0 deletions manager/middlewares/rate_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package middlewares

import (
"net/http"
"time"

"github.com/gin-gonic/gin"
"github.com/juju/ratelimit"
)

func RateLimit(fillInterval time.Duration, capacity, quantum int64) gin.HandlerFunc {
bucket := ratelimit.NewBucketWithQuantum(fillInterval, capacity, quantum)

return func(c *gin.Context) {
if bucket.TakeAvailable(1) < 1 {
c.String(http.StatusTooManyRequests, "rate limit exceeded")
c.Abort()
return
}

c.Next()
}
}

0 comments on commit 75d6242

Please sign in to comment.