Skip to content

Commit

Permalink
client/comms,binance: Update ws connection URL
Browse files Browse the repository at this point in the history
This updates the ws connection interface to allow for updating of the URL.
This is useful for the connection to Binance. If additional markets are
subscribed after the initial connection to the market streams API,
in the event of an error, the websocket connection will reconnect using
the initial URL, and the additional markets will only be resubscribed after
a list subscriptions check is done. With this change, all the markets will
be subscribed during a reconnection.
  • Loading branch information
martonp committed Oct 11, 2024
1 parent 83f6d4e commit f164da2
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
26 changes: 20 additions & 6 deletions client/comms/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type WsConn interface {
RequestWithTimeout(msg *msgjson.Message, respHandler func(*msgjson.Message), expireTime time.Duration, expire func()) error
Connect(ctx context.Context) (*sync.WaitGroup, error)
MessageSource() <-chan *msgjson.Message
UpdateURL(url string)
}

// When the DEX sends a request to the client, a responseHandler is created
Expand Down Expand Up @@ -161,6 +162,7 @@ type wsConn struct {
cfg *WsCfg
tlsCfg *tls.Config
readCh chan *msgjson.Message
urlV atomic.Value

wsMtx sync.Mutex
ws *websocket.Conn
Expand Down Expand Up @@ -203,14 +205,26 @@ func NewWsConn(cfg *WsCfg) (WsConn, error) {
ServerName: uri.Hostname(),
}

return &wsConn{
conn := &wsConn{
cfg: cfg,
log: cfg.Logger,
tlsCfg: tlsConfig,
readCh: make(chan *msgjson.Message, readBuffSize),
respHandlers: make(map[uint64]*responseHandler),
reconnectCh: make(chan struct{}, 1),
}, nil
}

conn.urlV.Store(cfg.URL)

return conn, nil
}

func (conn *wsConn) UpdateURL(uri string) {
conn.urlV.Store(uri)
}

func (conn *wsConn) url() string {
return conn.urlV.Load().(string)
}

// IsDown indicates if the connection is known to be down.
Expand Down Expand Up @@ -240,7 +254,7 @@ func (conn *wsConn) connect(ctx context.Context) error {
dialer.Proxy = http.ProxyFromEnvironment
}

ws, _, err := dialer.DialContext(ctx, conn.cfg.URL, conn.cfg.ConnectHeaders)
ws, _, err := dialer.DialContext(ctx, conn.url(), conn.cfg.ConnectHeaders)
if err != nil {
if isErrorInvalidCert(err) {
conn.setConnectionStatus(InvalidCert)
Expand Down Expand Up @@ -331,7 +345,7 @@ func (conn *wsConn) handleReadError(err error) {

var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
conn.log.Errorf("Read timeout on connection to %s.", conn.cfg.URL)
conn.log.Errorf("Read timeout on connection to %s.", conn.url())
reconnect()
return
}
Expand Down Expand Up @@ -457,11 +471,11 @@ func (conn *wsConn) keepAlive(ctx context.Context) {
return
}

conn.log.Infof("Attempting to reconnect to %s...", conn.cfg.URL)
conn.log.Infof("Attempting to reconnect to %s...", conn.url())
err := conn.connect(ctx)
if err != nil {
conn.log.Errorf("Reconnect failed. Scheduling reconnect to %s in %.1f seconds.",
conn.cfg.URL, rcInt.Seconds())
conn.url(), rcInt.Seconds())
time.AfterFunc(rcInt, func() {
conn.reconnectCh <- struct{}{}
})
Expand Down
18 changes: 15 additions & 3 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -1628,7 +1628,10 @@ func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, b
streamID := marketDataStreamID(mktID)

bnc.booksMtx.Lock()
defer bnc.booksMtx.Unlock()
defer func() {
bnc.booksMtx.Unlock()
bnc.marketStream.UpdateURL(bnc.marketStreamsURL())
}()

book, found := bnc.books[mktID]
if found {
Expand Down Expand Up @@ -1747,6 +1750,14 @@ out:
return nil
}

// marketStreamsURL returns the URL for the market data stream using all the
// currently subscribed markets.
//
// bnc.booksMtx MUST NOT be held when calling this function.
func (bnc *binance) marketStreamsURL() string {
return bnc.wsURL + "/stream?streams=" + strings.Join(bnc.streams(), "/")
}

// connectToMarketDataStream is called when the first market is subscribed to.
// It creates a connection to the market data stream and starts a goroutine
// to reconnect every 12 hours, as Binance will close the stream every 24
Expand All @@ -1757,7 +1768,6 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
checkSubsC := make(chan struct{})

newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) {
addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/"))
// Need to send key but not signature
connectEventFunc := func(cs comms.ConnectionStatus) {
if cs != comms.Disconnected && cs != comms.Connected {
Expand All @@ -1776,7 +1786,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
}
}
conn, err := comms.NewWsConn(&comms.WsCfg{
URL: addr,
URL: bnc.marketStreamsURL(),
// Binance Docs: The websocket server will send a ping frame every 3
// minutes. If the websocket server does not receive a pong frame
// back from the connection within a 10 minute period, the connection
Expand Down Expand Up @@ -1929,6 +1939,8 @@ func (bnc *binance) UnsubscribeMarket(baseID, quoteID uint32) (err error) {
if unsubscribe {
if err := bnc.subUnsubDepth(false, streamID); err != nil {
bnc.log.Errorf("error unsubscribing from market data stream", err)
} else {
bnc.marketStream.UpdateURL(bnc.marketStreamsURL())
}
}
}()
Expand Down

0 comments on commit f164da2

Please sign in to comment.