Skip to content

Commit

Permalink
binance: update book stream url (#3015)
Browse files Browse the repository at this point in the history
* update binance book stream url
  • Loading branch information
buck54321 authored Oct 11, 2024
1 parent e291957 commit aee9af7
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
25 changes: 19 additions & 6 deletions client/comms/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type WsConn interface {
RequestWithTimeout(msg *msgjson.Message, respHandler func(*msgjson.Message), expireTime time.Duration, expire func()) error
Connect(ctx context.Context) (*sync.WaitGroup, error)
MessageSource() <-chan *msgjson.Message
UpdateURL(string)
}

// When the DEX sends a request to the client, a responseHandler is created
Expand Down Expand Up @@ -161,6 +162,7 @@ type wsConn struct {
cfg *WsCfg
tlsCfg *tls.Config
readCh chan *msgjson.Message
urlV atomic.Value // string

wsMtx sync.Mutex
ws *websocket.Conn
Expand Down Expand Up @@ -203,14 +205,25 @@ func NewWsConn(cfg *WsCfg) (WsConn, error) {
ServerName: uri.Hostname(),
}

return &wsConn{
conn := &wsConn{
cfg: cfg,
log: cfg.Logger,
tlsCfg: tlsConfig,
readCh: make(chan *msgjson.Message, readBuffSize),
respHandlers: make(map[uint64]*responseHandler),
reconnectCh: make(chan struct{}, 1),
}, nil
}
conn.urlV.Store(cfg.URL)

return conn, nil
}

func (conn *wsConn) UpdateURL(uri string) {
conn.urlV.Store(uri)
}

func (conn *wsConn) url() string {
return conn.urlV.Load().(string)
}

// IsDown indicates if the connection is known to be down.
Expand Down Expand Up @@ -240,7 +253,7 @@ func (conn *wsConn) connect(ctx context.Context) error {
dialer.Proxy = http.ProxyFromEnvironment
}

ws, _, err := dialer.DialContext(ctx, conn.cfg.URL, conn.cfg.ConnectHeaders)
ws, _, err := dialer.DialContext(ctx, conn.url(), conn.cfg.ConnectHeaders)
if err != nil {
if isErrorInvalidCert(err) {
conn.setConnectionStatus(InvalidCert)
Expand Down Expand Up @@ -331,7 +344,7 @@ func (conn *wsConn) handleReadError(err error) {

var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
conn.log.Errorf("Read timeout on connection to %s.", conn.cfg.URL)
conn.log.Errorf("Read timeout on connection to %s.", conn.url())
reconnect()
return
}
Expand Down Expand Up @@ -457,11 +470,11 @@ func (conn *wsConn) keepAlive(ctx context.Context) {
return
}

conn.log.Infof("Attempting to reconnect to %s...", conn.cfg.URL)
conn.log.Infof("Attempting to reconnect to %s...", conn.url())
err := conn.connect(ctx)
if err != nil {
conn.log.Errorf("Reconnect failed. Scheduling reconnect to %s in %.1f seconds.",
conn.cfg.URL, rcInt.Seconds())
conn.url(), rcInt.Seconds())
time.AfterFunc(rcInt, func() {
conn.reconnectCh <- struct{}{}
})
Expand Down
2 changes: 2 additions & 0 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ func (conn *TWebsocket) Connect(context.Context) (*sync.WaitGroup, error) {
return &sync.WaitGroup{}, conn.connectErr
}

func (conn *TWebsocket) UpdateURL(string) {}

type TDB struct {
updateWalletErr error
acct *db.AccountInfo
Expand Down
32 changes: 18 additions & 14 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -1618,7 +1618,7 @@ func (bnc *binance) getOrderbookSnapshot(ctx context.Context, mktSymbol string)
// subscribeToAdditionalMarketDataStream is called when a new market is
// subscribed to after the market data stream connection has already been
// established.
func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, baseID, quoteID uint32) error {
func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, baseID, quoteID uint32) (err error) {
baseCfg, quoteCfg, err := bncAssetCfgs(baseID, quoteID)
if err != nil {
return fmt.Errorf("error getting asset cfg for %d: %w", baseID, err)
Expand All @@ -1627,6 +1627,10 @@ func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, b
mktID := binanceMktID(baseCfg, quoteCfg)
streamID := marketDataStreamID(mktID)

defer func() {
bnc.marketStream.UpdateURL(bnc.streamURL())
}()

bnc.booksMtx.Lock()
defer bnc.booksMtx.Unlock()

Expand Down Expand Up @@ -1662,6 +1666,10 @@ func (bnc *binance) streams() []string {
return streamNames
}

func (bnc *binance) streamURL() string {
return fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/"))
}

// checkSubs will query binance for current market subscriptions and compare
// that to what subscriptions we should have. If there is a discrepancy a
// warning is logged and the market subbed or unsubbed.
Expand Down Expand Up @@ -1756,8 +1764,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
reconnectC := make(chan struct{})
checkSubsC := make(chan struct{})

newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) {
addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/"))
newConnection := func() (*dex.ConnectionMaster, error) {
// Need to send key but not signature
connectEventFunc := func(cs comms.ConnectionStatus) {
if cs != comms.Disconnected && cs != comms.Connected {
Expand All @@ -1776,7 +1783,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
}
}
conn, err := comms.NewWsConn(&comms.WsCfg{
URL: addr,
URL: bnc.streamURL(),
// Binance Docs: The websocket server will send a ping frame every 3
// minutes. If the websocket server does not receive a pong frame
// back from the connection within a 10 minute period, the connection
Expand All @@ -1795,16 +1802,16 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
RawHandler: bnc.handleMarketDataNote,
})
if err != nil {
return nil, nil, err
return nil, err
}

bnc.marketStream = conn
cm := dex.NewConnectionMaster(conn)
if err = cm.ConnectOnce(ctx); err != nil {
return nil, nil, fmt.Errorf("websocketHandler remote connect: %v", err)
return nil, fmt.Errorf("websocketHandler remote connect: %v", err)
}

return conn, cm, nil
return cm, nil
}

// Add the initial book to the books map
Expand All @@ -1822,32 +1829,27 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
bnc.booksMtx.Unlock()

// Create initial connection to the market data stream
conn, cm, err := newConnection()
cm, err := newConnection()
if err != nil {
return fmt.Errorf("error connecting to market data stream : %v", err)
}

bnc.marketStream = conn

book.sync(ctx)

// Start a goroutine to reconnect every 12 hours
go func() {
reconnect := func() error {
bnc.marketStreamMtx.Lock()
defer bnc.marketStreamMtx.Unlock()

oldCm := cm
conn, cm, err = newConnection()
cm, err = newConnection()
if err != nil {
return err
}

if oldCm != nil {
oldCm.Disconnect()
}

bnc.marketStream = conn
return nil
}

Expand Down Expand Up @@ -1922,6 +1924,8 @@ func (bnc *binance) UnsubscribeMarket(baseID, quoteID uint32) (err error) {
defer func() {
bnc.booksMtx.Unlock()

conn.UpdateURL(bnc.streamURL())

if closer != nil {
closer.Disconnect()
}
Expand Down

0 comments on commit aee9af7

Please sign in to comment.