diff --git a/client/comms/wsconn.go b/client/comms/wsconn.go index 96ffabd082..903aad8e59 100644 --- a/client/comms/wsconn.go +++ b/client/comms/wsconn.go @@ -98,6 +98,7 @@ type WsConn interface { RequestWithTimeout(msg *msgjson.Message, respHandler func(*msgjson.Message), expireTime time.Duration, expire func()) error Connect(ctx context.Context) (*sync.WaitGroup, error) MessageSource() <-chan *msgjson.Message + UpdateURL(string) } // When the DEX sends a request to the client, a responseHandler is created @@ -161,6 +162,7 @@ type wsConn struct { cfg *WsCfg tlsCfg *tls.Config readCh chan *msgjson.Message + urlV atomic.Value // string wsMtx sync.Mutex ws *websocket.Conn @@ -203,14 +205,25 @@ func NewWsConn(cfg *WsCfg) (WsConn, error) { ServerName: uri.Hostname(), } - return &wsConn{ + conn := &wsConn{ cfg: cfg, log: cfg.Logger, tlsCfg: tlsConfig, readCh: make(chan *msgjson.Message, readBuffSize), respHandlers: make(map[uint64]*responseHandler), reconnectCh: make(chan struct{}, 1), - }, nil + } + conn.urlV.Store(cfg.URL) + + return conn, nil +} + +func (conn *wsConn) UpdateURL(uri string) { + conn.urlV.Store(uri) +} + +func (conn *wsConn) url() string { + return conn.urlV.Load().(string) } // IsDown indicates if the connection is known to be down. @@ -240,7 +253,7 @@ func (conn *wsConn) connect(ctx context.Context) error { dialer.Proxy = http.ProxyFromEnvironment } - ws, _, err := dialer.DialContext(ctx, conn.cfg.URL, conn.cfg.ConnectHeaders) + ws, _, err := dialer.DialContext(ctx, conn.url(), conn.cfg.ConnectHeaders) if err != nil { if isErrorInvalidCert(err) { conn.setConnectionStatus(InvalidCert) @@ -331,7 +344,7 @@ func (conn *wsConn) handleReadError(err error) { var netErr net.Error if errors.As(err, &netErr) && netErr.Timeout() { - conn.log.Errorf("Read timeout on connection to %s.", conn.cfg.URL) + conn.log.Errorf("Read timeout on connection to %s.", conn.url()) reconnect() return } @@ -457,11 +470,11 @@ func (conn *wsConn) keepAlive(ctx context.Context) { return } - conn.log.Infof("Attempting to reconnect to %s...", conn.cfg.URL) + conn.log.Infof("Attempting to reconnect to %s...", conn.url()) err := conn.connect(ctx) if err != nil { conn.log.Errorf("Reconnect failed. Scheduling reconnect to %s in %.1f seconds.", - conn.cfg.URL, rcInt.Seconds()) + conn.url(), rcInt.Seconds()) time.AfterFunc(rcInt, func() { conn.reconnectCh <- struct{}{} }) diff --git a/client/core/core_test.go b/client/core/core_test.go index 95a73ddfb1..5162c80baf 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -347,6 +347,8 @@ func (conn *TWebsocket) Connect(context.Context) (*sync.WaitGroup, error) { return &sync.WaitGroup{}, conn.connectErr } +func (conn *TWebsocket) UpdateURL(string) {} + type TDB struct { updateWalletErr error acct *db.AccountInfo diff --git a/client/mm/libxc/binance.go b/client/mm/libxc/binance.go index de90cd72ea..dcd6a703b5 100644 --- a/client/mm/libxc/binance.go +++ b/client/mm/libxc/binance.go @@ -1618,7 +1618,7 @@ func (bnc *binance) getOrderbookSnapshot(ctx context.Context, mktSymbol string) // subscribeToAdditionalMarketDataStream is called when a new market is // subscribed to after the market data stream connection has already been // established. -func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, baseID, quoteID uint32) error { +func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, baseID, quoteID uint32) (err error) { baseCfg, quoteCfg, err := bncAssetCfgs(baseID, quoteID) if err != nil { return fmt.Errorf("error getting asset cfg for %d: %w", baseID, err) @@ -1627,6 +1627,10 @@ func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, b mktID := binanceMktID(baseCfg, quoteCfg) streamID := marketDataStreamID(mktID) + defer func() { + bnc.marketStream.UpdateURL(bnc.streamURL()) + }() + bnc.booksMtx.Lock() defer bnc.booksMtx.Unlock() @@ -1662,6 +1666,10 @@ func (bnc *binance) streams() []string { return streamNames } +func (bnc *binance) streamURL() string { + return fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/")) +} + // checkSubs will query binance for current market subscriptions and compare // that to what subscriptions we should have. If there is a discrepancy a // warning is logged and the market subbed or unsubbed. @@ -1756,8 +1764,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote reconnectC := make(chan struct{}) checkSubsC := make(chan struct{}) - newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) { - addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/")) + newConnection := func() (*dex.ConnectionMaster, error) { // Need to send key but not signature connectEventFunc := func(cs comms.ConnectionStatus) { if cs != comms.Disconnected && cs != comms.Connected { @@ -1776,7 +1783,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote } } conn, err := comms.NewWsConn(&comms.WsCfg{ - URL: addr, + URL: bnc.streamURL(), // Binance Docs: The websocket server will send a ping frame every 3 // minutes. If the websocket server does not receive a pong frame // back from the connection within a 10 minute period, the connection @@ -1795,16 +1802,16 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote RawHandler: bnc.handleMarketDataNote, }) if err != nil { - return nil, nil, err + return nil, err } bnc.marketStream = conn cm := dex.NewConnectionMaster(conn) if err = cm.ConnectOnce(ctx); err != nil { - return nil, nil, fmt.Errorf("websocketHandler remote connect: %v", err) + return nil, fmt.Errorf("websocketHandler remote connect: %v", err) } - return conn, cm, nil + return cm, nil } // Add the initial book to the books map @@ -1822,13 +1829,11 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote bnc.booksMtx.Unlock() // Create initial connection to the market data stream - conn, cm, err := newConnection() + cm, err := newConnection() if err != nil { return fmt.Errorf("error connecting to market data stream : %v", err) } - bnc.marketStream = conn - book.sync(ctx) // Start a goroutine to reconnect every 12 hours @@ -1836,9 +1841,8 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote reconnect := func() error { bnc.marketStreamMtx.Lock() defer bnc.marketStreamMtx.Unlock() - oldCm := cm - conn, cm, err = newConnection() + cm, err = newConnection() if err != nil { return err } @@ -1846,8 +1850,6 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote if oldCm != nil { oldCm.Disconnect() } - - bnc.marketStream = conn return nil } @@ -1922,6 +1924,8 @@ func (bnc *binance) UnsubscribeMarket(baseID, quoteID uint32) (err error) { defer func() { bnc.booksMtx.Unlock() + conn.UpdateURL(bnc.streamURL()) + if closer != nil { closer.Disconnect() }