Skip to content

Commit

Permalink
reconnect jitter configurable (#467)
Browse files Browse the repository at this point in the history
* reconnect jitter configurable

* updated docs: reconnect jitter configurable

* Update pkg/common/nats/conn.go

Co-authored-by: Dan <5727701+dan-j@users.noreply.github.com>

---------

Co-authored-by: Dan <5727701+dan-j@users.noreply.github.com>
  • Loading branch information
astelmashenko and dan-j authored Jan 11, 2024
1 parent c6fb1ff commit 44752ad
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 5 deletions.
8 changes: 5 additions & 3 deletions docs/jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ data:
secret:
name: "" # a secret containing a `ca.crt` entry.
connOpts:
retryOnFailedConnect: true # should it reconnect on failed connection
maxReconnects: 50 # max reconnect attempts
reconnectWait: 2000 # delay between reconnect attempts
retryOnFailedConnect: true # should it reconnect on failed connection
maxReconnects: 50 # max reconnect attempts
reconnectWait: 2000 # delay between reconnect attempts
reconnectJitterMilliseconds: 100 # upper bound of a random delay added ReconnectWait
reconnectJitterTLSMilliseconds: 1000 # upper bound of a random delay added ReconnectWait
```
## JetStream integration
Expand Down
3 changes: 2 additions & 1 deletion examples/config-nats.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ data:
retryOnFailedConnect: true
maxReconnects: 5
reconnectWaitMilliseconds: 2000
reconnectJitterMilliseconds: 100
reconnectJitterTLSMilliseconds: 1000
4 changes: 4 additions & 0 deletions pkg/common/config/eventingnatsconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ type ConnOpts struct {
RetryOnFailedConnect bool `json:"retryOnFailedConnect,omitempty"`
// ReconnectWaitMilliseconds time between reconnects in milliseconds
ReconnectWaitMilliseconds int `json:"reconnectWaitMilliseconds,omitempty"`
// ReconnectJitterMilliseconds Option to set the upper bound of a random delay added ReconnectWait
ReconnectJitterMilliseconds int `json:"reconnectJitterMilliseconds,omitempty"`
// ReconnectJitterTLSMilliseconds Option to set the upper bound of a random delay added ReconnectWait
ReconnectJitterTLSMilliseconds int `json:"reconnectJitterTLSMilliseconds,omitempty"`
}
17 changes: 16 additions & 1 deletion pkg/common/nats/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ import (
"knative.dev/eventing-natss/pkg/common/constants"
)

const (
defaultReconnectJitter = time.Duration(100) * time.Millisecond
defaultReconnectJitterTLS = time.Duration(1000) * time.Millisecond
)

var (
ErrBadCredentialFileOption = errors.New("bad auth.credentialFile option")
ErrBadMTLSOption = errors.New("bad auth.tls option")
Expand Down Expand Up @@ -82,6 +87,9 @@ func NewNatsConn(ctx context.Context, config commonconfig.EventingNatsConfig) (*
// reconnection options
if config.ConnOpts != nil && config.ConnOpts.RetryOnFailedConnect {
reconnectWait := time.Duration(config.ConnOpts.ReconnectWaitMilliseconds) * time.Millisecond
reconnectJitter := defaultJitterIfEmpty(config.ConnOpts.ReconnectJitterMilliseconds, defaultReconnectJitter)
reconnectJitterTLS := defaultJitterIfEmpty(config.ConnOpts.ReconnectJitterTLSMilliseconds, defaultReconnectJitterTLS)

logger.Infof("Configuring retries: %#v", config.ConnOpts)
opts = append(opts, nats.RetryOnFailedConnect(config.ConnOpts.RetryOnFailedConnect))
opts = append(opts, nats.ReconnectWait(reconnectWait))
Expand All @@ -93,7 +101,7 @@ func NewNatsConn(ctx context.Context, config commonconfig.EventingNatsConfig) (*
logger.Debugf("Reconnect attempts left: %d", config.ConnOpts.MaxReconnects-attempts)
return reconnectWait
}))
opts = append(opts, nats.ReconnectJitter(1000, time.Millisecond))
opts = append(opts, nats.ReconnectJitter(reconnectJitter, reconnectJitterTLS))
opts = append(opts, nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
logger.Warnf("Disconnected from JSM: err=%v", err)
logger.Warnf("Disconnected from JSM: will attempt reconnects for %d", config.ConnOpts.MaxReconnects)
Expand All @@ -108,6 +116,13 @@ func NewNatsConn(ctx context.Context, config commonconfig.EventingNatsConfig) (*
return nats.Connect(url, opts...)
}

func defaultJitterIfEmpty(jitter int, defaultValue time.Duration) time.Duration {
if jitter == 0 {
return defaultValue
}
return time.Duration(jitter) * time.Millisecond
}

func buildAuthOption(ctx context.Context, config commonconfig.ENConfigAuth, secrets clientsetcorev1.SecretInterface) ([]nats.Option, error) {
opts := make([]nats.Option, 0, 2)
if config.CredentialFile != nil {
Expand Down

0 comments on commit 44752ad

Please sign in to comment.