Skip to content

Commit

Permalink
Merge pull request #269 from dvonthenen/fix-ws-self-hosted
Browse files Browse the repository at this point in the history
Fix Using `ws://` With Self Hosted
  • Loading branch information
davidvonthenen authored Nov 7, 2024
2 parents 85b9256 + 268d1d2 commit 7d9d003
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 49 deletions.
4 changes: 3 additions & 1 deletion pkg/api/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options

// check if the host has a protocol
r := regexp.MustCompile(`^(https?)://(.+)$`)
if apiType == APITypeLive {
if apiType == APITypeLive || apiType == APITypeSpeakStream {
r = regexp.MustCompile(`^(wss?)://(.+)$`)
}

Expand Down Expand Up @@ -98,6 +98,7 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options
// construct the full path and substitute the version and all query parameters
fullpath := fmt.Sprintf("%%s/%s", path)
completeFullpath := fmt.Sprintf(fullpath, append([]interface{}{version}, args...)...)
klog.V(3).Infof("completeFullpath: %s\n", completeFullpath)

// construct the URL
var u url.URL
Expand All @@ -106,6 +107,7 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options
} else {
u = url.URL{Scheme: protocol, Host: host, Path: completeFullpath}
}
klog.V(3).Infof("URI final: %s\n", u.String())

return u.String(), nil
}
128 changes: 82 additions & 46 deletions pkg/client/common/v1/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func (c *WSClient) internalConnect() *websocket.Conn {
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), false)
}

//nolint:funlen // this is a complex function. keep as is
//nolint:funlen,gocyclo // this is a complex function. keep as is
func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn {
klog.V(7).Infof("live.internalConnectWithCancel() ENTER\n")
klog.V(7).Infof("common.internalConnectWithCancel() ENTER\n")

// set the context
c.ctx = ctx
Expand All @@ -94,13 +94,16 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
if lock {
klog.V(3).Infof("Locking connection mutex\n")
c.muConn.Lock()
defer c.muConn.Unlock()
}

// we explicitly stopped and should not attempt to reconnect
if !c.retry {
klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}
return nil
}

Expand All @@ -109,32 +112,36 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
select {
case <-c.ctx.Done():
klog.V(1).Infof("Connection is not valid\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}
return nil
default:
klog.V(7).Infof("Connection is good. Return object.")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}
return c.wsconn
}
} else {
select {
case <-c.ctx.Done():
klog.V(1).Infof("Context is not valid. Has been canceled.\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}
return nil
default:
klog.V(3).Infof("Context is still valid. Retry...\n")
}
}

dialer := websocket.Dialer{
HandshakeTimeout: 45 * time.Second,
/* #nosec G402 */
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.cOptions.SkipServerAuth},
RedirectService: c.cOptions.RedirectService,
SkipServerAuth: c.cOptions.SkipServerAuth,
}

// set websocket headers
myHeader := http.Header{}

Expand Down Expand Up @@ -175,10 +182,30 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
if err != nil {
klog.V(1).Infof("GetURL failed. Err: %v\n", err)
klog.V(7).Infof("internalConnectWithCancel() LEAVE\n")
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}
return nil // no point in retrying because this is going to fail on every retry
}
klog.V(5).Infof("Connecting to %s\n", url)

// if host starts with "ws://", then disable TLS
var dialer websocket.Dialer
if url[:5] == "ws://" {
dialer = websocket.Dialer{
HandshakeTimeout: 15 * time.Second,
RedirectService: c.cOptions.RedirectService,
}
} else {
dialer = websocket.Dialer{
HandshakeTimeout: 15 * time.Second,
/* #nosec G402 */
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.cOptions.SkipServerAuth},
RedirectService: c.cOptions.RedirectService,
SkipServerAuth: c.cOptions.SkipServerAuth,
}
}
// perform the websocket connection
ws, res, err := dialer.DialContext(c.ctx, url, myHeader)
if res != nil {
Expand All @@ -197,6 +224,10 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont

// kick off threads to listen for messages and ping/keepalive
go c.listen()
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}

// start WS specific items
(*c.processMessages).Start()
Expand All @@ -210,21 +241,26 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
}

klog.V(3).Infof("WebSocket Connection Successful!")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")

return c.wsconn
}

// if we get here, we failed to connect
klog.V(1).Infof("Failed to connect to websocket: %s\n", c.cOptions.Host)
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")

if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}

return nil
}

//nolint:funlen // this is a complex function. keep as is
func (c *WSClient) listen() {
klog.V(6).Infof("live.listen() ENTER\n")
klog.V(6).Infof("common.listen() ENTER\n")

defer func() {
if r := recover(); r != nil {
Expand All @@ -240,7 +276,7 @@ func (c *WSClient) listen() {
// fatal close
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
}
}()
Expand All @@ -256,7 +292,7 @@ func (c *WSClient) listen() {
c.muConn.Unlock()

klog.V(3).Infof("listen: Connection is not valid\n")
klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
}

Expand All @@ -275,15 +311,15 @@ func (c *WSClient) listen() {
// graceful close
c.closeWs(false, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
case strings.Contains(errStr, UseOfClosedSocket):
klog.V(3).Infof("Probable graceful websocket close: %v\n", err)

// fatal close
c.closeWs(false, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
case strings.Contains(errStr, FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)
Expand All @@ -297,7 +333,7 @@ func (c *WSClient) listen() {
// fatal close
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
case strings.Contains(errStr, "Deepgram"):
klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err)
Expand All @@ -311,7 +347,7 @@ func (c *WSClient) listen() {
// close the connection
c.closeWs(false, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
case (err == io.EOF || err == io.ErrUnexpectedEOF):
klog.V(3).Infof("stream object EOF\n")
Expand All @@ -325,7 +361,7 @@ func (c *WSClient) listen() {
// close the connection
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
default:
klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err)
Expand All @@ -339,7 +375,7 @@ func (c *WSClient) listen() {
// close the connection
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
}
}
Expand All @@ -359,7 +395,7 @@ func (c *WSClient) listen() {

// WriteBinary writes binary data to the websocket server
func (c *WSClient) WriteBinary(byData []byte) error {
klog.V(7).Infof("live.WriteBinary() ENTER\n")
klog.V(7).Infof("common.WriteBinary() ENTER\n")

// doing a write, need to lock
c.muConn.Lock()
Expand All @@ -370,7 +406,7 @@ func (c *WSClient) WriteBinary(byData []byte) error {
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
klog.V(7).Infof("common.WriteBinary() LEAVE\n")

return err
}
Expand All @@ -380,28 +416,28 @@ func (c *WSClient) WriteBinary(byData []byte) error {
byData,
); err != nil {
klog.V(1).Infof("WriteBinary WriteMessage failed. Err: %v\n", err)
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
klog.V(7).Infof("common.WriteBinary() LEAVE\n")
return err
}

klog.V(7).Infof("WriteBinary Successful\n")
klog.V(7).Infof("payload: %x\n", byData)
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
klog.V(7).Infof("common.WriteBinary() LEAVE\n")

return nil
}

/*
WriteJSON writes a JSON control payload to the websocket server. These are control messages for
managing the live transcription session on the Deepgram server.
managing the websocket connection.
*/
func (c *WSClient) WriteJSON(payload interface{}) error {
klog.V(6).Infof("live.WriteJSON() ENTER\n")
klog.V(6).Infof("common.WriteJSON() ENTER\n")

byData, err := json.Marshal(payload)
if err != nil {
klog.V(1).Infof("WriteJSON: Error marshaling JSON. Data: %v, Err: %v\n", payload, err)
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
klog.V(6).Infof("common.WriteJSON() LEAVE\n")
return err
}

Expand All @@ -414,7 +450,7 @@ func (c *WSClient) WriteJSON(payload interface{}) error {
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
klog.V(6).Infof("common.WriteJSON() LEAVE\n")

return err
}
Expand All @@ -424,20 +460,20 @@ func (c *WSClient) WriteJSON(payload interface{}) error {
byData,
); err != nil {
klog.V(1).Infof("WriteJSON WriteMessage failed. Err: %v\n", err)
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
klog.V(6).Infof("common.WriteJSON() LEAVE\n")
return err
}

klog.V(4).Infof("live.WriteJSON() Succeeded\n")
klog.V(4).Infof("common.WriteJSON() Succeeded\n")
klog.V(6).Infof("payload: %s\n", string(byData))
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
klog.V(6).Infof("common.WriteJSON() LEAVE\n")

return nil
}

// closeStream sends an application level message to Deepgram
func (c *WSClient) closeStream(lock bool) error {
klog.V(7).Infof("live.closeStream() ENTER\n")
klog.V(7).Infof("common.closeStream() ENTER\n")

// doing a write, need to lock
if lock {
Expand All @@ -456,20 +492,20 @@ func (c *WSClient) closeStream(lock bool) error {

if err != nil {
klog.V(1).Infof("WriteMessage failed. Err: %v\n", err)
klog.V(7).Infof("live.closeStream() LEAVE\n")
klog.V(7).Infof("common.closeStream() LEAVE\n")

return err
}

klog.V(4).Infof("closeStream Succeeded\n")
klog.V(7).Infof("live.closeStream() LEAVE\n")
klog.V(7).Infof("common.closeStream() LEAVE\n")

return err
}

// normalClosure sends a normal closure message to the server
func (c *WSClient) normalClosure(lock bool) error {
klog.V(7).Infof("live.normalClosure() ENTER\n")
klog.V(7).Infof("common.normalClosure() ENTER\n")

// doing a write, need to lock
if lock {
Expand All @@ -481,7 +517,7 @@ func (c *WSClient) normalClosure(lock bool) error {
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.normalClosure() LEAVE\n")
klog.V(7).Infof("common.normalClosure() LEAVE\n")

return err
}
Expand All @@ -496,7 +532,7 @@ func (c *WSClient) normalClosure(lock bool) error {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", err)
}

klog.V(7).Infof("live.normalClosure() LEAVE\n")
klog.V(7).Infof("common.normalClosure() LEAVE\n")

return err
}
Expand All @@ -514,7 +550,7 @@ func (c *WSClient) Stop() {

// closeWs closes the websocket connection
func (c *WSClient) closeWs(fatal bool, perm bool) {
klog.V(6).Infof("live.closeWs() closing channels...\n")
klog.V(6).Infof("common.closeWs() closing channels...\n")

// doing a write, need to lock
c.muConn.Lock()
Expand Down Expand Up @@ -555,8 +591,8 @@ func (c *WSClient) closeWs(fatal bool, perm bool) {
c.wsconn = nil
}

klog.V(4).Infof("live.closeWs() Succeeded\n")
klog.V(6).Infof("live.closeWs() LEAVE\n")
klog.V(4).Infof("common.closeWs() Succeeded\n")
klog.V(6).Infof("common.closeWs() LEAVE\n")
}

// sendError sends an error message to the callback handler
Expand Down
Loading

0 comments on commit 7d9d003

Please sign in to comment.