diff --git a/README.md b/README.md index 7be183f..f8a0cac 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ # 说明 -https://github.com/go-kit/kit +https://github.com/dreamsxin/go-kit diff --git a/endpoint/circuitbreaker/gobreaker.go b/endpoint/circuitbreaker/gobreaker.go new file mode 100644 index 0000000..b0a5c57 --- /dev/null +++ b/endpoint/circuitbreaker/gobreaker.go @@ -0,0 +1,22 @@ +package circuitbreaker + +import ( + "context" + + "github.com/sony/gobreaker" + + "github.com/dreamsxin/go-kit/endpoint" +) + +// Gobreaker returns an endpoint.Middleware that implements the circuit +// breaker pattern using the sony/gobreaker package. Only errors returned by +// the wrapped endpoint count against the circuit breaker's error count. +// +// See http://godoc.org/github.com/sony/gobreaker for more information. +func Gobreaker(cb *gobreaker.CircuitBreaker) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + return cb.Execute(func() (interface{}, error) { return next(ctx, request) }) + } + } +} diff --git a/endpoint/circuitbreaker/gobreaker_test.go b/endpoint/circuitbreaker/gobreaker_test.go new file mode 100644 index 0000000..788a812 --- /dev/null +++ b/endpoint/circuitbreaker/gobreaker_test.go @@ -0,0 +1,19 @@ +package circuitbreaker_test + +import ( + "testing" + + "github.com/sony/gobreaker" + + "github.com/dreamsxin/go-kit/endpoint/circuitbreaker" +) + +func TestGobreaker(t *testing.T) { + var ( + breaker = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{})) + primeWith = 100 + shouldPass = func(n int) bool { return n <= 5 } // https://github.com/sony/gobreaker/blob/bfa846d/gobreaker.go#L76 + circuitOpenError = "circuit breaker is open" + ) + testFailingEndpoint(t, breaker, primeWith, shouldPass, 0, circuitOpenError) +} diff --git a/endpoint/circuitbreaker/handy_breaker.go b/endpoint/circuitbreaker/handy_breaker.go new file mode 100644 index 0000000..e932313 --- /dev/null +++ b/endpoint/circuitbreaker/handy_breaker.go @@ -0,0 +1,38 @@ +package circuitbreaker + +import ( + "context" + "time" + + "github.com/streadway/handy/breaker" + + "github.com/dreamsxin/go-kit/endpoint" +) + +// HandyBreaker returns an endpoint.Middleware that implements the circuit +// breaker pattern using the streadway/handy/breaker package. Only errors +// returned by the wrapped endpoint count against the circuit breaker's error +// count. +// +// See http://godoc.org/github.com/streadway/handy/breaker for more +// information. +func HandyBreaker(cb breaker.Breaker) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { + if !cb.Allow() { + return nil, breaker.ErrCircuitOpen + } + + defer func(begin time.Time) { + if err == nil { + cb.Success(time.Since(begin)) + } else { + cb.Failure(time.Since(begin)) + } + }(time.Now()) + + response, err = next(ctx, request) + return + } + } +} diff --git a/endpoint/circuitbreaker/handy_breaker_test.go b/endpoint/circuitbreaker/handy_breaker_test.go new file mode 100644 index 0000000..7e1a6f6 --- /dev/null +++ b/endpoint/circuitbreaker/handy_breaker_test.go @@ -0,0 +1,20 @@ +package circuitbreaker_test + +import ( + "testing" + + handybreaker "github.com/streadway/handy/breaker" + + "github.com/dreamsxin/go-kit/endpoint/circuitbreaker" +) + +func TestHandyBreaker(t *testing.T) { + var ( + failureRatio = 0.05 + breaker = circuitbreaker.HandyBreaker(handybreaker.NewBreaker(failureRatio)) + primeWith = handybreaker.DefaultMinObservations * 10 + shouldPass = func(n int) bool { return (float64(n) / float64(primeWith+n)) <= failureRatio } + openCircuitError = handybreaker.ErrCircuitOpen.Error() + ) + testFailingEndpoint(t, breaker, primeWith, shouldPass, 0, openCircuitError) +} diff --git a/endpoint/circuitbreaker/hystrix.go b/endpoint/circuitbreaker/hystrix.go new file mode 100644 index 0000000..da086a8 --- /dev/null +++ b/endpoint/circuitbreaker/hystrix.go @@ -0,0 +1,31 @@ +package circuitbreaker + +import ( + "context" + + "github.com/afex/hystrix-go/hystrix" + + "github.com/dreamsxin/go-kit/endpoint" +) + +// Hystrix returns an endpoint.Middleware that implements the circuit +// breaker pattern using the afex/hystrix-go package. +// +// When using this circuit breaker, please configure your commands separately. +// +// See https://godoc.org/github.com/afex/hystrix-go/hystrix for more +// information. +func Hystrix(commandName string) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { + var resp interface{} + if err := hystrix.Do(commandName, func() (err error) { + resp, err = next(ctx, request) + return err + }, nil); err != nil { + return nil, err + } + return resp, nil + } + } +} diff --git a/endpoint/circuitbreaker/hystrix_test.go b/endpoint/circuitbreaker/hystrix_test.go new file mode 100644 index 0000000..80b133a --- /dev/null +++ b/endpoint/circuitbreaker/hystrix_test.go @@ -0,0 +1,40 @@ +package circuitbreaker_test + +import ( + "io/ioutil" + stdlog "log" + "testing" + "time" + + "github.com/afex/hystrix-go/hystrix" + + "github.com/dreamsxin/go-kit/endpoint/circuitbreaker" +) + +func TestHystrix(t *testing.T) { + stdlog.SetOutput(ioutil.Discard) + + const ( + commandName = "my-endpoint" + errorPercent = 5 + maxConcurrent = 1000 + ) + hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{ + ErrorPercentThreshold: errorPercent, + MaxConcurrentRequests: maxConcurrent, + }) + + var ( + breaker = circuitbreaker.Hystrix(commandName) + primeWith = hystrix.DefaultVolumeThreshold * 2 + shouldPass = func(n int) bool { return (float64(n) / float64(primeWith+n)) <= (float64(errorPercent-1) / 100.0) } + openCircuitError = hystrix.ErrCircuitOpen.Error() + ) + + // hystrix-go uses buffered channels to receive reports on request success/failure, + // and so is basically impossible to test deterministically. We have to make sure + // the report buffer is emptied, by injecting a sleep between each invocation. + requestDelay := 5 * time.Millisecond + + testFailingEndpoint(t, breaker, primeWith, shouldPass, requestDelay, openCircuitError) +} diff --git a/endpoint/circuitbreaker/util_test.go b/endpoint/circuitbreaker/util_test.go new file mode 100644 index 0000000..ca51db3 --- /dev/null +++ b/endpoint/circuitbreaker/util_test.go @@ -0,0 +1,75 @@ +package circuitbreaker_test + +import ( + "context" + "errors" + "fmt" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/dreamsxin/go-kit/endpoint" +) + +func testFailingEndpoint( + t *testing.T, + breaker endpoint.Middleware, + primeWith int, + shouldPass func(int) bool, + requestDelay time.Duration, + openCircuitError string, +) { + _, file, line, _ := runtime.Caller(1) + caller := fmt.Sprintf("%s:%d", filepath.Base(file), line) + + // Create a mock endpoint and wrap it with the breaker. + m := mock{} + var e endpoint.Endpoint + e = m.endpoint + e = breaker(e) + + // Prime the endpoint with successful requests. + for i := 0; i < primeWith; i++ { + if _, err := e(context.Background(), struct{}{}); err != nil { + t.Fatalf("%s: during priming, got error: %v", caller, err) + } + time.Sleep(requestDelay) + } + + // Switch the endpoint to start throwing errors. + m.err = errors.New("tragedy+disaster") + m.through = 0 + + // The first several should be allowed through and yield our error. + for i := 0; shouldPass(i); i++ { + if _, err := e(context.Background(), struct{}{}); err != m.err { + t.Fatalf("%s: want %v, have %v", caller, m.err, err) + } + time.Sleep(requestDelay) + } + through := m.through + + // But the rest should be blocked by an open circuit. + for i := 0; i < 10; i++ { + if _, err := e(context.Background(), struct{}{}); err.Error() != openCircuitError { + t.Fatalf("%s: want %q, have %q", caller, openCircuitError, err.Error()) + } + time.Sleep(requestDelay) + } + + // Make sure none of those got through. + if want, have := through, m.through; want != have { + t.Errorf("%s: want %d, have %d", caller, want, have) + } +} + +type mock struct { + through int + err error +} + +func (m *mock) endpoint(context.Context, interface{}) (interface{}, error) { + m.through++ + return struct{}{}, m.err +} diff --git a/go.mod b/go.mod index f5788c4..6f9e9c3 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,11 @@ module github.com/dreamsxin/go-kit go 1.21.4 require ( + github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/google/go-cmp v0.6.0 github.com/hashicorp/consul/api v1.27.0 + github.com/sony/gobreaker v1.0.0 + github.com/streadway/handy v0.0.0-20200128134331-0f66f006fb2e go.uber.org/zap v1.26.0 google.golang.org/grpc v1.61.0 google.golang.org/protobuf v1.31.0 @@ -26,6 +29,7 @@ require ( github.com/mattn/go-isatty v0.0.17 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/smartystreets/goconvey v1.8.1 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect golang.org/x/net v0.18.0 // indirect diff --git a/go.sum b/go.sum index 7ae6e8d..30b5d92 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw= +github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -46,6 +48,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= +github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= github.com/hashicorp/consul/api v1.27.0 h1:gmJ6DPKQog1426xsdmgk5iqDyoRiNc+ipBdJOqKQFjc= github.com/hashicorp/consul/api v1.27.0/go.mod h1:JkekNRSou9lANFdt+4IKx3Za7XY0JzzpQjEb4Ivo1c8= github.com/hashicorp/consul/sdk v0.15.1 h1:kKIGxc7CZtflcF5DLfHeq7rOQmRq3vk7kwISN9bif8Q= @@ -92,6 +96,8 @@ github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY github.com/hashicorp/serf v0.10.1/go.mod h1:yL2t6BqATOLGc5HF7qbFkTfXoPIY0WZdWHfEvMqbG+4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -156,6 +162,14 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUt github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGByCY= +github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec= +github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY= +github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60= +github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= +github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/streadway/handy v0.0.0-20200128134331-0f66f006fb2e h1:mOtuXaRAbVZsxAHVdPR3IjfmN8T1h2iczJLynhLybf8= +github.com/streadway/handy v0.0.0-20200128134331-0f66f006fb2e/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=