From ad343fe6b1bca73dbf433559109e3bf3e587bd34 Mon Sep 17 00:00:00 2001 From: riyafa Date: Fri, 18 May 2018 09:58:14 +0530 Subject: [PATCH] Get response from server on client handshake Provides a way to access the response from the server when the client initiates a WebSocket handshake. Also renames certain classes which have Impl suffix to have Default prefix Related issue https://github.com/ballerina-platform/ballerina-lang/issues/8390 --- .../websocket/ClientHandshakeFuture.java | 53 +++++++ .../websocket/ClientHandshakeListener.java | 42 +++++ .../contract/websocket/HandshakeFuture.java | 29 ---- .../websocket/ServerHandshakeFuture.java | 48 ++++++ ...ener.java => ServerHandshakeListener.java} | 7 +- .../websocket/WebSocketClientConnector.java | 4 +- .../websocket/WebSocketInitMessage.java | 49 +++--- .../contract/websocket/WebSocketMessage.java | 32 ---- .../DefaultClientHandshakeFuture.java | 73 +++++++++ ...java => DefaultServerHandshakeFuture.java} | 26 ++-- .../DefaultWebSocketClientConnector.java | 4 +- ...Impl.java => DefaultWebSocketMessage.java} | 32 +--- ...ava => DefaultWebSocketBinaryMessage.java} | 6 +- ...java => DefaultWebSocketCloseMessage.java} | 8 +- ...va => DefaultWebSocketControlMessage.java} | 6 +- .../message/DefaultWebSocketInitMessage.java | 35 ++--- ....java => DefaultWebSocketTextMessage.java} | 8 +- .../internal/websocket/WebSocketUtil.java | 23 +-- .../WebSocketServerHandshakeHandler.java | 16 +- .../listener/WebSocketSourceHandler.java | 35 ++--- .../sender/websocket/WebSocketClient.java | 18 ++- .../websocket/WebSocketTargetHandler.java | 64 +++++--- ...tpToWsProtocolSwitchWebSocketListener.java | 6 +- .../websocket/WebSocketClientTestCase.java | 144 +++++++++--------- ...ketMessagePropertiesConnectorListener.java | 19 +-- ...ketPassthroughServerConnectorListener.java | 20 +-- .../WebSocketTestServerConnectorListener.java | 14 +- 27 files changed, 485 insertions(+), 336 deletions(-) create mode 100644 components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ClientHandshakeFuture.java create mode 100644 components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ClientHandshakeListener.java delete mode 100644 components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/HandshakeFuture.java create mode 100644 components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ServerHandshakeFuture.java rename components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/{HandshakeListener.java => ServerHandshakeListener.java} (88%) create mode 100644 components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultClientHandshakeFuture.java rename components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/{HandshakeFutureImpl.java => DefaultServerHandshakeFuture.java} (62%) rename components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/{WebSocketMessageImpl.java => DefaultWebSocketMessage.java} (79%) rename components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/{WebSocketBinaryMessageImpl.java => DefaultWebSocketBinaryMessage.java} (85%) rename components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/{WebSocketCloseMessageImpl.java => DefaultWebSocketCloseMessage.java} (79%) rename components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/{WebSocketControlMessageImpl.java => DefaultWebSocketControlMessage.java} (85%) rename components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/{WebSocketTextMessageImpl.java => DefaultWebSocketTextMessage.java} (79%) diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ClientHandshakeFuture.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ClientHandshakeFuture.java new file mode 100644 index 000000000..034332d97 --- /dev/null +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ClientHandshakeFuture.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.wso2.transport.http.netty.contract.websocket; + +import org.wso2.transport.http.netty.message.HttpCarbonResponse; + +/** + * Future for WebSocket handshake. + */ +public interface ClientHandshakeFuture { + + /** + * Set the listener for WebSocket handshake. + * + * @param serverHandshakeListener Listener for WebSocket handshake. + * @return the same handshake future. + */ + ClientHandshakeFuture setClientHandshakeListener(ClientHandshakeListener serverHandshakeListener); + + /** + * Notify the success of the WebSocket handshake. + * + * @param webSocketConnection {@link WebSocketConnection} for the successful connection. + * @param response {@link HttpCarbonResponse} received from server. + */ + void notifySuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response); + + /** + * Notify any error occurred during the handshake. + * + * @param throwable error occurred during handshake. + * @param response {@link HttpCarbonResponse} received from server or null if error occurred before response is + * received. + */ + void notifyError(Throwable throwable, HttpCarbonResponse response); +} diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ClientHandshakeListener.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ClientHandshakeListener.java new file mode 100644 index 000000000..e11f5b078 --- /dev/null +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ClientHandshakeListener.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.wso2.transport.http.netty.contract.websocket; + +import org.wso2.transport.http.netty.message.HttpCarbonResponse; + +/** + * Future listener for WebSocket handshake. + */ +public interface ClientHandshakeListener { + + /** + * Notify the success of the handshake. + * + * @param webSocketConnection {@link WebSocketConnection} for the successful handshake. + */ + void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response); + + /** + * Notify error on handshake. + * @param t error occurred during handshake. + */ + void onError(Throwable t, HttpCarbonResponse response); + +} diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/HandshakeFuture.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/HandshakeFuture.java deleted file mode 100644 index e65c5719b..000000000 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/HandshakeFuture.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.wso2.transport.http.netty.contract.websocket; - -/** - * Future for WebSocket handshake. - */ -public interface HandshakeFuture { - - /** - * Set the listener for WebSocket handshake. - * - * @param handshakeListener Listener for WebSocket handshake. - * @return the same handshake future. - */ - public HandshakeFuture setHandshakeListener(HandshakeListener handshakeListener); - - /** - * Notify the success of the WebSocket handshake. - * - * @param webSocketConnection {@link WebSocketConnection} for the successful connection. - */ - public void notifySuccess(WebSocketConnection webSocketConnection); - - /** - * Notify any error occurred during the handshake. - * - * @param throwable error occurred during handshake. - */ - public void notifyError(Throwable throwable); -} diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ServerHandshakeFuture.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ServerHandshakeFuture.java new file mode 100644 index 000000000..e695b7eb6 --- /dev/null +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ServerHandshakeFuture.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.wso2.transport.http.netty.contract.websocket; + +/** + * Future for WebSocket handshake. + */ +public interface ServerHandshakeFuture { + + /** + * Set the listener for WebSocket handshake. + * + * @param serverHandshakeListener Listener for WebSocket handshake. + * @return the same handshake future. + */ + ServerHandshakeFuture setHandshakeListener(ServerHandshakeListener serverHandshakeListener); + + /** + * Notify the success of the WebSocket handshake. + * + * @param webSocketConnection {@link WebSocketConnection} for the successful connection. + */ + void notifySuccess(WebSocketConnection webSocketConnection); + + /** + * Notify any error occurred during the handshake. + * + * @param throwable error occurred during handshake. + */ + void notifyError(Throwable throwable); +} diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/HandshakeListener.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ServerHandshakeListener.java similarity index 88% rename from components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/HandshakeListener.java rename to components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ServerHandshakeListener.java index fb431f627..bf1fbd695 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/HandshakeListener.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/ServerHandshakeListener.java @@ -22,19 +22,20 @@ /** * Future listener for WebSocket handshake. */ -public interface HandshakeListener { +public interface ServerHandshakeListener { /** * Notify the success of the handshake. * * @param webSocketConnection {@link WebSocketConnection} for the successful handshake. */ - public void onSuccess(WebSocketConnection webSocketConnection); + void onSuccess(WebSocketConnection webSocketConnection); /** * Notify error on handshake. + * * @param t error occurred during handshake. */ - public void onError(Throwable t); + void onError(Throwable t); } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketClientConnector.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketClientConnector.java index ecc1015ba..fb7fcfcd6 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketClientConnector.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketClientConnector.java @@ -28,7 +28,7 @@ public interface WebSocketClientConnector { * Connect to the remote server. * * @param connectorListener {@link WebSocketConnectorListener} to listen incoming messages. - * @return HandshakeFuture for the newly created connection. + * @return ClientHandshakeFuture for the newly created connection. */ - HandshakeFuture connect(WebSocketConnectorListener connectorListener); + ClientHandshakeFuture connect(WebSocketConnectorListener connectorListener); } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketInitMessage.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketInitMessage.java index 5fcbaefc4..f1b6b26fc 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketInitMessage.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketInitMessage.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelFuture; import io.netty.handler.codec.http.HttpHeaders; +import org.wso2.transport.http.netty.message.HttpCarbonRequest; /** * This Message is used to handle WebSocket handshake. @@ -33,65 +34,73 @@ public interface WebSocketInitMessage extends WebSocketMessage { * * @return the Server session for the newly created WebSocket connection. */ - HandshakeFuture handshake(); + ServerHandshakeFuture handshake(); /** * Complete the handshake of a given request. There will not be a idle timeout for the connection if this * method is used. * - * @param subProtocols Sub-Protocols which are allowed by the service. + * @param subProtocols Sub-Protocols which are allowed by the service. * @param allowExtensions whether the extensions are allowed or not. * @return the Server session for the newly created WebSocket connection. */ - HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions); + ServerHandshakeFuture handshake(String[] subProtocols, boolean allowExtensions); /** * Complete the handshake of a given request. The connection will be timed out if the connection is idle for * given time period. * - * @param subProtocols Sub-Protocols which are allowed by the service. + * @param subProtocols Sub-Protocols which are allowed by the service. * @param allowExtensions whether the extensions are allowed or not. - * @param idleTimeout Idle timeout in milli-seconds for WebSocket connection. + * @param idleTimeout Idle timeout in milli-seconds for WebSocket connection. * @return the handshake future. */ - HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout); + ServerHandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout); /** * Complete the handshake of a given request. The connection will be timed out if the connection is idle for given * time period. * - * @param subProtocols Sub-Protocols which are allowed by the service. + * @param subProtocols Sub-Protocols which are allowed by the service. * @param allowExtensions whether the extensions are allowed or not. - * @param idleTimeout Idle timeout in milli-seconds for WebSocket connection. + * @param idleTimeout Idle timeout in milli-seconds for WebSocket connection. * @param responseHeaders Extra headers to add to the handshake response or {@code null} if no extra headers should * be added * @return the handshake future. */ - HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout, - HttpHeaders responseHeaders); + ServerHandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout, + HttpHeaders responseHeaders); /** * Complete the handshake of a given request. The connection will be timed out if the connection is idle for given * time period. * - * @param subProtocols Sub-Protocols which are allowed by the service. - * @param allowExtensions whether the extensions are allowed or not. - * @param idleTimeout Idle timeout in milli-seconds for WebSocket connection. - * @param responseHeaders Extra headers to add to the handshake response or {@code null} if no extra headers should - * be added. + * @param subProtocols Sub-Protocols which are allowed by the service. + * @param allowExtensions whether the extensions are allowed or not. + * @param idleTimeout Idle timeout in milli-seconds for WebSocket connection. + * @param responseHeaders Extra headers to add to the handshake response or {@code null} if no extra + * headers should + * be added. * @param maxFramePayloadLength Maximum allowable frame payload length. Setting this value to your application's - * requirement may reduce denial of service attacks using long data frames. + * requirement may reduce denial of service attacks using long data frames. * @return the handshake future. */ - HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout, - HttpHeaders responseHeaders, int maxFramePayloadLength); + ServerHandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout, + HttpHeaders responseHeaders, int maxFramePayloadLength); + + /** + * Get the HttpCarbonRequest received from the client to perform the handshake. + * + * @return the carbon request received to perform the handshake + */ + HttpCarbonRequest getHttpCarbonRequest(); + /** * Cancel the handshake with HTTP response. * - * @param closeCode close code for cancelling the handshake. + * @param closeCode close code for cancelling the handshake. * @param closeReason reason for canceling the handshake. - * * @return the ChannelPromise created after submitting response */ ChannelFuture cancelHandshake(int closeCode, String closeReason); diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketMessage.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketMessage.java index 99591b87d..2474ed173 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketMessage.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketMessage.java @@ -19,8 +19,6 @@ package org.wso2.transport.http.netty.contract.websocket; -import java.util.Map; - /** * This is the common interface for all WebSocket messages. * Note: Use this interface in the application level only and only if the user needs only the channel details @@ -71,36 +69,6 @@ public interface WebSocketMessage { */ WebSocketConnection getWebSocketConnection(); - /** - * Set header for the message. - * - * @param key key of the header. - * @param value value of the header. - */ - void setHeader(String key, String value); - - /** - * Set headers for the message. - * - * @param headers map of headers which should be added to the current headers. - */ - void setHeaders(Map headers); - - /** - * Get the value of a header. - * - * @param key key of the header. - * @return the value of the header. - */ - String getHeader(String key); - - /** - * Get a map of all headers. - * - * @return a map of all headers. - */ - Map getHeaders(); - /** * Retrieve the session ID. * diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultClientHandshakeFuture.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultClientHandshakeFuture.java new file mode 100644 index 000000000..9b415f91d --- /dev/null +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultClientHandshakeFuture.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.wso2.transport.http.netty.contractimpl.websocket; + +import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeFuture; +import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeListener; +import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection; +import org.wso2.transport.http.netty.message.HttpCarbonResponse; + +/** + * Implementation of WebSocket handshake future. + */ +public class DefaultClientHandshakeFuture implements ClientHandshakeFuture { + + private Throwable throwable = null; + private WebSocketConnection webSocketConnection = null; + private ClientHandshakeListener clientHandshakeListener; + private HttpCarbonResponse response; + + public DefaultClientHandshakeFuture() { + } + + @Override + public ClientHandshakeFuture setClientHandshakeListener(ClientHandshakeListener clientHandshakeListener) { + this.clientHandshakeListener = clientHandshakeListener; + if (throwable != null) { + clientHandshakeListener.onError(throwable, response); + } + if (webSocketConnection != null && response != null) { + clientHandshakeListener.onSuccess(webSocketConnection, response); + } + return this; + } + + @Override + public void notifySuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { + this.webSocketConnection = webSocketConnection; + this.response = response; + if (clientHandshakeListener == null || throwable != null) { + return; + } + clientHandshakeListener.onSuccess(webSocketConnection, response); + } + + @Override + public void notifyError(Throwable throwable, HttpCarbonResponse response) { + this.response = response; + this.throwable = throwable; + if (clientHandshakeListener == null) { + return; + } + clientHandshakeListener.onError(throwable, response); + } + + +} diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/HandshakeFutureImpl.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultServerHandshakeFuture.java similarity index 62% rename from components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/HandshakeFutureImpl.java rename to components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultServerHandshakeFuture.java index 9c06a323f..6d57cb79d 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/HandshakeFutureImpl.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultServerHandshakeFuture.java @@ -19,30 +19,30 @@ package org.wso2.transport.http.netty.contractimpl.websocket; -import org.wso2.transport.http.netty.contract.websocket.HandshakeFuture; -import org.wso2.transport.http.netty.contract.websocket.HandshakeListener; +import org.wso2.transport.http.netty.contract.websocket.ServerHandshakeFuture; +import org.wso2.transport.http.netty.contract.websocket.ServerHandshakeListener; import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection; /** * Implementation of WebSocket handshake future. */ -public class HandshakeFutureImpl implements HandshakeFuture { +public class DefaultServerHandshakeFuture implements ServerHandshakeFuture { private Throwable throwable = null; private WebSocketConnection webSocketConnection = null; - private HandshakeListener handshakeListener; + private ServerHandshakeListener serverHandshakeListener; - public HandshakeFutureImpl() { + public DefaultServerHandshakeFuture() { } @Override - public HandshakeFuture setHandshakeListener(HandshakeListener handshakeListener) { - this.handshakeListener = handshakeListener; + public ServerHandshakeFuture setHandshakeListener(ServerHandshakeListener serverHandshakeListener) { + this.serverHandshakeListener = serverHandshakeListener; if (throwable != null) { - handshakeListener.onError(throwable); + serverHandshakeListener.onError(throwable); } if (webSocketConnection != null) { - handshakeListener.onSuccess(webSocketConnection); + serverHandshakeListener.onSuccess(webSocketConnection); } return this; } @@ -50,19 +50,19 @@ public HandshakeFuture setHandshakeListener(HandshakeListener handshakeListener) @Override public void notifySuccess(WebSocketConnection webSocketConnection) { this.webSocketConnection = webSocketConnection; - if (handshakeListener == null || throwable != null) { + if (serverHandshakeListener == null || throwable != null) { return; } - handshakeListener.onSuccess(webSocketConnection); + serverHandshakeListener.onSuccess(webSocketConnection); } @Override public void notifyError(Throwable throwable) { this.throwable = throwable; - if (handshakeListener == null) { + if (serverHandshakeListener == null) { return; } - handshakeListener.onError(throwable); + serverHandshakeListener.onError(throwable); } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketClientConnector.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketClientConnector.java index ee5cb88b0..ec6660a21 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketClientConnector.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketClientConnector.java @@ -20,7 +20,7 @@ package org.wso2.transport.http.netty.contractimpl.websocket; import io.netty.channel.EventLoopGroup; -import org.wso2.transport.http.netty.contract.websocket.HandshakeFuture; +import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeFuture; import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector; import org.wso2.transport.http.netty.contract.websocket.WebSocketConnectorListener; import org.wso2.transport.http.netty.contract.websocket.WsClientConnectorConfig; @@ -51,7 +51,7 @@ public DefaultWebSocketClientConnector(WsClientConnectorConfig clientConnectorCo } @Override - public HandshakeFuture connect(WebSocketConnectorListener connectorListener) { + public ClientHandshakeFuture connect(WebSocketConnectorListener connectorListener) { WebSocketClient webSocketClient = new WebSocketClient(remoteUrl, subProtocols, idleTimeout, wsClientEventLoopGroup, customHeaders, connectorListener, autoRead); return webSocketClient.handshake(); diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketMessageImpl.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketMessage.java similarity index 79% rename from components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketMessageImpl.java rename to components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketMessage.java index 2f1b0c2e5..34b8c493c 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketMessageImpl.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketMessage.java @@ -28,7 +28,7 @@ /** * Implementation of {@link WebSocketMessage}. */ -public class WebSocketMessageImpl implements WebSocketMessage { +public class DefaultWebSocketMessage implements WebSocketMessage { private final Map properties = new HashMap<>(); protected String subProtocol; @@ -38,16 +38,13 @@ public class WebSocketMessageImpl implements WebSocketMessage { protected boolean isServerMessage; protected WebSocketConnection webSocketConnection; protected String sessionlID; - protected Map headers = new HashMap<>(); public void setProperty(String key, Object value) { properties.put(key, value); } public void setProperties(Map properties) { - properties.entrySet().forEach( - entry -> this.properties.put(entry.getKey(), entry.getValue()) - ); + properties.forEach(this.properties::put); } public Object getProperty(String key) { @@ -95,12 +92,13 @@ public String getListenerInterface() { public void setIsConnectionSecured(boolean isConnectionSecured) { this.isConnectionSecured = isConnectionSecured; } + @Override public boolean isConnectionSecured() { return isConnectionSecured; } - public void setIsServerMessage(boolean isServerMessage) { + public void setIsServerMessage(boolean isServerMessage) { this.isServerMessage = isServerMessage; } @@ -118,28 +116,6 @@ public WebSocketConnection getWebSocketConnection() { return webSocketConnection; } - @Override - public void setHeader(String key, String value) { - headers.put(key, value); - } - - @Override - public void setHeaders(Map headers) { - headers.entrySet().forEach( - entry -> this.headers.put(entry.getKey(), entry.getValue()) - ); - } - - @Override - public String getHeader(String key) { - return headers.get(key); - } - - @Override - public Map getHeaders() { - return headers; - } - @Override public String getSessionID() { return sessionlID; diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketBinaryMessageImpl.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketBinaryMessage.java similarity index 85% rename from components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketBinaryMessageImpl.java rename to components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketBinaryMessage.java index fb0ebc0ba..8244a517b 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketBinaryMessageImpl.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketBinaryMessage.java @@ -20,19 +20,19 @@ package org.wso2.transport.http.netty.contractimpl.websocket.message; import org.wso2.transport.http.netty.contract.websocket.WebSocketBinaryMessage; -import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketMessageImpl; +import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketMessage; import java.nio.ByteBuffer; /** * Implementation of {@link WebSocketBinaryMessage}. */ -public class WebSocketBinaryMessageImpl extends WebSocketMessageImpl implements WebSocketBinaryMessage { +public class DefaultWebSocketBinaryMessage extends DefaultWebSocketMessage implements WebSocketBinaryMessage { private final ByteBuffer buffer; private final boolean isFinalFragment; - public WebSocketBinaryMessageImpl(ByteBuffer buffer, boolean isFinalFragment) { + public DefaultWebSocketBinaryMessage(ByteBuffer buffer, boolean isFinalFragment) { this.buffer = buffer; this.isFinalFragment = isFinalFragment; } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketCloseMessageImpl.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketCloseMessage.java similarity index 79% rename from components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketCloseMessageImpl.java rename to components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketCloseMessage.java index d9a665c6e..20b162eac 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketCloseMessageImpl.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketCloseMessage.java @@ -20,21 +20,21 @@ package org.wso2.transport.http.netty.contractimpl.websocket.message; import org.wso2.transport.http.netty.contract.websocket.WebSocketCloseMessage; -import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketMessageImpl; +import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketMessage; /** * Implementation of {@link WebSocketCloseMessage}. */ -public class WebSocketCloseMessageImpl extends WebSocketMessageImpl implements WebSocketCloseMessage { +public class DefaultWebSocketCloseMessage extends DefaultWebSocketMessage implements WebSocketCloseMessage { private final int closeCode; private final String closeReason; - public WebSocketCloseMessageImpl(int closeCode) { + public DefaultWebSocketCloseMessage(int closeCode) { this(closeCode, null); } - public WebSocketCloseMessageImpl(int closeCode, String closeReason) { + public DefaultWebSocketCloseMessage(int closeCode, String closeReason) { this.closeCode = closeCode; this.closeReason = closeReason; } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketControlMessageImpl.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketControlMessage.java similarity index 85% rename from components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketControlMessageImpl.java rename to components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketControlMessage.java index bd254a201..0b38b6343 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketControlMessageImpl.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketControlMessage.java @@ -21,19 +21,19 @@ import org.wso2.transport.http.netty.contract.websocket.WebSocketControlMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketControlSignal; -import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketMessageImpl; +import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketMessage; import java.nio.ByteBuffer; /** * Implementation of WebSocket control message. */ -public class WebSocketControlMessageImpl extends WebSocketMessageImpl implements WebSocketControlMessage { +public class DefaultWebSocketControlMessage extends DefaultWebSocketMessage implements WebSocketControlMessage { private final WebSocketControlSignal controlSignal; private final ByteBuffer buffer; - public WebSocketControlMessageImpl(WebSocketControlSignal controlSignal, ByteBuffer buffer) { + public DefaultWebSocketControlMessage(WebSocketControlSignal controlSignal, ByteBuffer buffer) { this.controlSignal = controlSignal; this.buffer = buffer; } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketInitMessage.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketInitMessage.java index 4d6e6ece7..3bdf98290 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketInitMessage.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketInitMessage.java @@ -35,23 +35,22 @@ import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.handler.timeout.IdleStateHandler; import org.wso2.transport.http.netty.common.Constants; -import org.wso2.transport.http.netty.contract.websocket.HandshakeFuture; +import org.wso2.transport.http.netty.contract.websocket.ServerHandshakeFuture; import org.wso2.transport.http.netty.contract.websocket.WebSocketInitMessage; +import org.wso2.transport.http.netty.contractimpl.websocket.DefaultServerHandshakeFuture; import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketConnection; -import org.wso2.transport.http.netty.contractimpl.websocket.HandshakeFutureImpl; -import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketMessageImpl; +import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketMessage; import org.wso2.transport.http.netty.internal.websocket.WebSocketUtil; import org.wso2.transport.http.netty.listener.WebSocketSourceHandler; import org.wso2.transport.http.netty.message.HttpCarbonRequest; import java.nio.charset.StandardCharsets; -import java.util.Map; import java.util.concurrent.TimeUnit; /** * Implementation of {@link WebSocketInitMessage}. */ -public class DefaultWebSocketInitMessage extends WebSocketMessageImpl implements WebSocketInitMessage { +public class DefaultWebSocketInitMessage extends DefaultWebSocketMessage implements WebSocketInitMessage { private final ChannelHandlerContext ctx; private final FullHttpRequest httpRequest; @@ -61,16 +60,15 @@ public class DefaultWebSocketInitMessage extends WebSocketMessageImpl implements private HttpCarbonRequest request; public DefaultWebSocketInitMessage(ChannelHandlerContext ctx, FullHttpRequest httpRequest, - WebSocketSourceHandler webSocketSourceHandler, Map headers) { + WebSocketSourceHandler webSocketSourceHandler) { this.ctx = ctx; this.httpRequest = httpRequest; this.webSocketSourceHandler = webSocketSourceHandler; - this.headers = headers; this.sessionlID = WebSocketUtil.getSessionID(ctx); } @Override - public HandshakeFuture handshake() { + public ServerHandshakeFuture handshake() { WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketURL(httpRequest), null, true); WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(httpRequest); @@ -78,7 +76,7 @@ public HandshakeFuture handshake() { } @Override - public HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions) { + public ServerHandshakeFuture handshake(String[] subProtocols, boolean allowExtensions) { WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketURL(httpRequest), getSubProtocolsCSV(subProtocols), allowExtensions); @@ -87,7 +85,7 @@ public HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions) } @Override - public HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout) { + public ServerHandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout) { WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketURL(httpRequest), getSubProtocolsCSV(subProtocols), allowExtensions); @@ -96,8 +94,8 @@ public HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, } @Override - public HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout, - HttpHeaders responseHeaders) { + public ServerHandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout, + HttpHeaders responseHeaders) { WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketURL(httpRequest), getSubProtocolsCSV(subProtocols), allowExtensions); @@ -106,8 +104,8 @@ public HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, } @Override - public HandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout, - HttpHeaders responseHeaders, int maxFramePayloadLength) { + public ServerHandshakeFuture handshake(String[] subProtocols, boolean allowExtensions, int idleTimeout, + HttpHeaders responseHeaders, int maxFramePayloadLength) { WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketURL(httpRequest), getSubProtocolsCSV(subProtocols), allowExtensions, maxFramePayloadLength); @@ -155,9 +153,9 @@ public boolean isHandshakeStarted() { return handshakeStarted; } - private HandshakeFuture handleHandshake(WebSocketServerHandshaker handshaker, int idleTimeout, - HttpHeaders headers) { - HandshakeFutureImpl handshakeFuture = new HandshakeFutureImpl(); + private ServerHandshakeFuture handleHandshake(WebSocketServerHandshaker handshaker, int idleTimeout, + HttpHeaders headers) { + DefaultServerHandshakeFuture handshakeFuture = new DefaultServerHandshakeFuture(); if (cancelled) { Throwable e = new IllegalAccessException("Handshake is already cancelled!"); @@ -212,8 +210,7 @@ private String getWebSocketURL(HttpRequest req) { if (isConnectionSecured) { protocol = Constants.WEBSOCKET_PROTOCOL_SECURED; } - String url = protocol + "://" + req.headers().get("Host") + req.uri(); - return url; + return protocol + "://" + req.headers().get("Host") + req.uri(); } private String getSubProtocolsCSV(String[] subProtocols) { diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketTextMessageImpl.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketTextMessage.java similarity index 79% rename from components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketTextMessageImpl.java rename to components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketTextMessage.java index b42466df7..18febc67e 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/WebSocketTextMessageImpl.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketTextMessage.java @@ -20,23 +20,23 @@ package org.wso2.transport.http.netty.contractimpl.websocket.message; import org.wso2.transport.http.netty.contract.websocket.WebSocketTextMessage; -import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketMessageImpl; +import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketMessage; /** * Implementation of {@link WebSocketTextMessage}. */ -public class WebSocketTextMessageImpl extends WebSocketMessageImpl implements WebSocketTextMessage { +public class DefaultWebSocketTextMessage extends DefaultWebSocketMessage implements WebSocketTextMessage { private final String text; private final boolean isFinalFragment; - public WebSocketTextMessageImpl(String text) { + public DefaultWebSocketTextMessage(String text) { this.text = text; this.isFinalFragment = true; } - public WebSocketTextMessageImpl(String text, boolean isFinalFragment) { + public DefaultWebSocketTextMessage(String text, boolean isFinalFragment) { this.text = text; this.isFinalFragment = isFinalFragment; } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/internal/websocket/WebSocketUtil.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/internal/websocket/WebSocketUtil.java index 67b359075..45fd14d57 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/internal/websocket/WebSocketUtil.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/internal/websocket/WebSocketUtil.java @@ -25,11 +25,11 @@ import org.wso2.transport.http.netty.contract.websocket.WebSocketControlMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketControlSignal; import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketConnection; +import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketMessage; import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketInboundFrameHandler; -import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketMessageImpl; -import org.wso2.transport.http.netty.contractimpl.websocket.message.WebSocketBinaryMessageImpl; -import org.wso2.transport.http.netty.contractimpl.websocket.message.WebSocketControlMessageImpl; -import org.wso2.transport.http.netty.contractimpl.websocket.message.WebSocketTextMessageImpl; +import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketBinaryMessage; +import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketControlMessage; +import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketTextMessage; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -55,21 +55,24 @@ public static WebSocketControlMessage getWebsocketControlMessage(WebSocketFrame WebSocketControlSignal controlSignal) { ByteBuf content = webSocketFrame.content(); ByteBuffer clonedContent = getClonedByteBuf(content); - WebSocketControlMessage webSocketControlMessage = new WebSocketControlMessageImpl(controlSignal, clonedContent); + WebSocketControlMessage webSocketControlMessage = new DefaultWebSocketControlMessage(controlSignal, + clonedContent); webSocketFrame.release(); return webSocketControlMessage; } - public static WebSocketMessageImpl getWebSocketMessage(WebSocketFrame frame, String text, boolean isFinalFragment) { - WebSocketMessageImpl webSocketTextMessage = new WebSocketTextMessageImpl(text, isFinalFragment); + public static DefaultWebSocketMessage getWebSocketMessage(WebSocketFrame frame, String text, + boolean isFinalFragment) { + DefaultWebSocketMessage webSocketTextMessage = new DefaultWebSocketTextMessage(text, isFinalFragment); frame.release(); return webSocketTextMessage; } - public static WebSocketMessageImpl getWebSocketMessage(WebSocketFrame webSocketFrame, ByteBuf content, - boolean finalFragment) { + public static DefaultWebSocketMessage getWebSocketMessage(WebSocketFrame webSocketFrame, ByteBuf content, + boolean finalFragment) { ByteBuffer clonedContent = getClonedByteBuf(content); - WebSocketMessageImpl webSocketBinaryMessage = new WebSocketBinaryMessageImpl(clonedContent, finalFragment); + DefaultWebSocketMessage webSocketBinaryMessage = new DefaultWebSocketBinaryMessage(clonedContent, + finalFragment); webSocketFrame.release(); return webSocketBinaryMessage; } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketServerHandshakeHandler.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketServerHandshakeHandler.java index fb64bd70b..d22b04e1d 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketServerHandshakeHandler.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketServerHandshakeHandler.java @@ -42,8 +42,6 @@ import org.wso2.transport.http.netty.message.PooledDataStreamerFactory; import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; /** * WebSocket handshake handler for carbon transports. @@ -61,7 +59,7 @@ public WebSocketServerHandshakeHandler(ServerConnectorFuture serverConnectorFutu } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof HttpRequest) { HttpRequest httpRequest = (HttpRequest) msg; @@ -86,14 +84,14 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) thro } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Remove ourselves and fail the handshake ctx.pipeline().remove(this); ctx.fireExceptionCaught(cause); } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) { ctx.fireChannelInactive(); } }); @@ -139,17 +137,13 @@ private void handleWebSocketHandshake(FullHttpRequest fullHttpRequest, ChannelHa } String uri = fullHttpRequest.uri(); - Map headers = new HashMap<>(); - fullHttpRequest.headers().forEach( - header -> headers.put(header.getKey(), header.getValue()) - ); WebSocketSourceHandler webSocketSourceHandler = new WebSocketSourceHandler(serverConnectorFuture, isSecured, fullHttpRequest, - headers, ctx, interfaceId); + ctx, interfaceId); DefaultWebSocketConnection webSocketConnection = WebSocketUtil.getWebSocketConnection(webSocketSourceHandler, isSecured, uri); DefaultWebSocketInitMessage initMessage = new DefaultWebSocketInitMessage(ctx, fullHttpRequest, - webSocketSourceHandler, headers); + webSocketSourceHandler); // Setting common properties for init message initMessage.setWebSocketConnection(webSocketConnection); diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketSourceHandler.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketSourceHandler.java index 637420908..d8969fd2c 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketSourceHandler.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketSourceHandler.java @@ -42,17 +42,16 @@ import org.wso2.transport.http.netty.contract.websocket.WebSocketFrameType; import org.wso2.transport.http.netty.contract.websocket.WebSocketTextMessage; import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketConnection; +import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketMessage; import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketInboundFrameHandler; -import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketMessageImpl; -import org.wso2.transport.http.netty.contractimpl.websocket.message.WebSocketCloseMessageImpl; -import org.wso2.transport.http.netty.contractimpl.websocket.message.WebSocketControlMessageImpl; +import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketCloseMessage; +import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketControlMessage; import org.wso2.transport.http.netty.exception.UnknownWebSocketFrameTypeException; import org.wso2.transport.http.netty.internal.HTTPTransportContextHolder; import org.wso2.transport.http.netty.internal.HandlerExecutor; import org.wso2.transport.http.netty.internal.websocket.WebSocketUtil; import java.net.InetSocketAddress; -import java.util.Map; import java.util.concurrent.CountDownLatch; /** @@ -66,7 +65,6 @@ public class WebSocketSourceHandler extends WebSocketInboundFrameHandler { private final ChannelHandlerContext ctx; private final boolean isSecured; private final ServerConnectorFuture connectorFuture; - private final Map headers; private final String interfaceId; private String subProtocol = null; private HandlerExecutor handlerExecutor; @@ -79,18 +77,16 @@ public class WebSocketSourceHandler extends WebSocketInboundFrameHandler { * @param connectorFuture {@link ServerConnectorFuture} to notify messages to application. * @param isSecured indication of whether the connection is secured or not. * @param httpRequest {@link FullHttpRequest} which contains the details of WebSocket Upgrade. - * @param headers Headers obtained from HTTP WebSocket upgrade request. * @param ctx {@link ChannelHandlerContext} of WebSocket connection. * @param interfaceId given ID for the socket interface. */ public WebSocketSourceHandler(ServerConnectorFuture connectorFuture, boolean isSecured, FullHttpRequest httpRequest, - Map headers, ChannelHandlerContext ctx, String interfaceId) { + ChannelHandlerContext ctx, String interfaceId) { this.connectorFuture = connectorFuture; this.isSecured = isSecured; this.ctx = ctx; this.interfaceId = interfaceId; this.target = httpRequest.uri(); - this.headers = headers; } /** @@ -157,8 +153,8 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (!(this.closeFrameReceived || webSocketConnection.closeFrameSent())) { // Notify abnormal closure. - WebSocketMessageImpl webSocketCloseMessage = - new WebSocketCloseMessageImpl(Constants.WEBSOCKET_STATUS_CODE_ABNORMAL_CLOSURE); + DefaultWebSocketMessage webSocketCloseMessage = + new DefaultWebSocketCloseMessage(Constants.WEBSOCKET_STATUS_CODE_ABNORMAL_CLOSURE); setupCommonProperties(webSocketCloseMessage); connectorFuture.notifyWSListener((WebSocketCloseMessage) webSocketCloseMessage); } @@ -208,14 +204,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) private void notifyTextMessage(WebSocketFrame frame, String text, boolean finalFragment) throws ServerConnectorException { - WebSocketMessageImpl webSocketTextMessage = WebSocketUtil.getWebSocketMessage(frame, text, finalFragment); + DefaultWebSocketMessage webSocketTextMessage = WebSocketUtil.getWebSocketMessage(frame, text, finalFragment); setupCommonProperties(webSocketTextMessage); connectorFuture.notifyWSListener((WebSocketTextMessage) webSocketTextMessage); } private void notifyBinaryMessage(WebSocketFrame frame, ByteBuf content, boolean finalFragment) throws ServerConnectorException { - WebSocketMessageImpl webSocketBinaryMessage = WebSocketUtil.getWebSocketMessage(frame, content, finalFragment); + DefaultWebSocketMessage webSocketBinaryMessage = WebSocketUtil.getWebSocketMessage(frame, content, + finalFragment); setupCommonProperties(webSocketBinaryMessage); connectorFuture.notifyWSListener((WebSocketBinaryMessage) webSocketBinaryMessage); } @@ -226,7 +223,7 @@ private void notifyCloseMessage(CloseWebSocketFrame closeWebSocketFrame) throws // closeCountDownLatch == null means that WebSocketConnection has not yet initiated a connection closure. if (closeCountDownLatch == null) { - WebSocketMessageImpl webSocketCloseMessage = new WebSocketCloseMessageImpl(statusCode, reasonText); + DefaultWebSocketMessage webSocketCloseMessage = new DefaultWebSocketCloseMessage(statusCode, reasonText); setupCommonProperties(webSocketCloseMessage); connectorFuture.notifyWSListener((WebSocketCloseMessage) webSocketCloseMessage); closeFrameReceived = true; @@ -240,32 +237,31 @@ private void notifyCloseMessage(CloseWebSocketFrame closeWebSocketFrame) throws private void notifyPingMessage(PingWebSocketFrame pingWebSocketFrame) throws ServerConnectorException { WebSocketControlMessage webSocketControlMessage = WebSocketUtil. getWebsocketControlMessage(pingWebSocketFrame, WebSocketControlSignal.PING); - setupCommonProperties((WebSocketMessageImpl) webSocketControlMessage); + setupCommonProperties((DefaultWebSocketMessage) webSocketControlMessage); connectorFuture.notifyWSListener(webSocketControlMessage); } private void notifyPongMessage(PongWebSocketFrame pongWebSocketFrame) throws ServerConnectorException { WebSocketControlMessage webSocketControlMessage = WebSocketUtil. getWebsocketControlMessage(pongWebSocketFrame, WebSocketControlSignal.PONG); - setupCommonProperties((WebSocketMessageImpl) webSocketControlMessage); + setupCommonProperties((DefaultWebSocketMessage) webSocketControlMessage); connectorFuture.notifyWSListener(webSocketControlMessage); } private void notifyIdleTimeout() throws ServerConnectorException { - WebSocketMessageImpl websocketControlMessage = new WebSocketControlMessageImpl( + DefaultWebSocketMessage websocketControlMessage = new DefaultWebSocketControlMessage( WebSocketControlSignal.IDLE_TIMEOUT, null); setupCommonProperties(websocketControlMessage); connectorFuture.notifyWSIdleTimeout((WebSocketControlMessage) websocketControlMessage); } - private void setupCommonProperties(WebSocketMessageImpl webSocketMessage) { + private void setupCommonProperties(DefaultWebSocketMessage webSocketMessage) { webSocketMessage.setSubProtocol(subProtocol); webSocketMessage.setTarget(target); webSocketMessage.setListenerInterface(interfaceId); webSocketMessage.setIsConnectionSecured(isSecured); webSocketMessage.setIsServerMessage(true); webSocketMessage.setWebSocketConnection(webSocketConnection); - webSocketMessage.setHeaders(headers); webSocketMessage.setSessionlID(webSocketConnection.getId()); webSocketMessage.setProperty(Constants.SRC_HANDLER, this); @@ -278,8 +274,7 @@ private void setupCommonProperties(WebSocketMessageImpl webSocketMessage) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ctx.channel().writeAndFlush(new CloseWebSocketFrame(1011, - "Encountered an unexpected condition")); + ctx.channel().writeAndFlush(new CloseWebSocketFrame(1011, "Encountered an unexpected condition")); ctx.close(); connectorFuture.notifyWSListener(cause); } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/sender/websocket/WebSocketClient.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/sender/websocket/WebSocketClient.java index 6df646410..df47f8d7c 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/sender/websocket/WebSocketClient.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/sender/websocket/WebSocketClient.java @@ -41,10 +41,10 @@ import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.wso2.transport.http.netty.contract.websocket.HandshakeFuture; +import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeFuture; import org.wso2.transport.http.netty.contract.websocket.WebSocketConnectorListener; +import org.wso2.transport.http.netty.contractimpl.websocket.DefaultClientHandshakeFuture; import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketConnection; -import org.wso2.transport.http.netty.contractimpl.websocket.HandshakeFutureImpl; import java.net.URI; import java.util.Map; @@ -95,8 +95,8 @@ public WebSocketClient(String url, String subProtocols, int idleTimeout, EventLo * * @return handshake future for connection. */ - public HandshakeFuture handshake() { - HandshakeFutureImpl handshakeFuture = new HandshakeFutureImpl(); + public ClientHandshakeFuture handshake() { + ClientHandshakeFuture handshakeFuture = new DefaultClientHandshakeFuture(); try { URI uri = new URI(url); String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); @@ -152,13 +152,17 @@ protected void initChannel(SocketChannel ch) { String actualSubProtocol = websocketHandshaker.actualSubprotocol(); webSocketTargetHandler.setActualSubProtocol(actualSubProtocol); webSocketConnection.getDefaultWebSocketSession().setNegotiatedSubProtocol(actualSubProtocol); - handshakeFuture.notifySuccess(webSocketConnection); + handshakeFuture.notifySuccess(webSocketConnection, webSocketTargetHandler.getHttpCarbonResponse()); } else { - handshakeFuture.notifyError(cause); + handshakeFuture.notifyError(cause, webSocketTargetHandler.getHttpCarbonResponse()); } }); } catch (Throwable t) { - handshakeFuture.notifyError(t); + if (webSocketTargetHandler != null) { + handshakeFuture.notifyError(t, webSocketTargetHandler.getHttpCarbonResponse()); + } else { + handshakeFuture.notifyError(t, null); + } } return handshakeFuture; } diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/sender/websocket/WebSocketTargetHandler.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/sender/websocket/WebSocketTargetHandler.java index 75a7dbdf0..bf8f1cd3f 100644 --- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/sender/websocket/WebSocketTargetHandler.java +++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/sender/websocket/WebSocketTargetHandler.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; @@ -47,12 +48,14 @@ import org.wso2.transport.http.netty.contract.websocket.WebSocketFrameType; import org.wso2.transport.http.netty.contract.websocket.WebSocketTextMessage; import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketConnection; +import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketMessage; import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketInboundFrameHandler; -import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketMessageImpl; -import org.wso2.transport.http.netty.contractimpl.websocket.message.WebSocketCloseMessageImpl; -import org.wso2.transport.http.netty.contractimpl.websocket.message.WebSocketControlMessageImpl; +import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketCloseMessage; +import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketControlMessage; import org.wso2.transport.http.netty.exception.UnknownWebSocketFrameTypeException; import org.wso2.transport.http.netty.internal.websocket.WebSocketUtil; +import org.wso2.transport.http.netty.message.DefaultListener; +import org.wso2.transport.http.netty.message.HttpCarbonResponse; import java.net.InetSocketAddress; import java.net.URISyntaxException; @@ -79,6 +82,7 @@ public class WebSocketTargetHandler extends WebSocketInboundFrameHandler { private WebSocketFrameType continuationFrameType; private boolean closeFrameReceived; private CountDownLatch closeCountDownLatch = null; + private HttpCarbonResponse httpCarbonResponse; public WebSocketTargetHandler(WebSocketClientHandshaker handshaker, boolean isSecure, String requestedUri, WebSocketConnectorListener webSocketConnectorListener) { @@ -107,6 +111,10 @@ public DefaultWebSocketConnection getWebSocketConnection() { return webSocketConnection; } + public HttpCarbonResponse getHttpCarbonResponse() { + return httpCarbonResponse; + } + @Override public boolean isCloseFrameReceived() { return closeFrameReceived; @@ -130,19 +138,19 @@ public void channelActive(ChannelHandlerContext ctx) throws URISyntaxException { } @Override - public void channelInactive(ChannelHandlerContext ctx) throws ServerConnectorException { + public void channelInactive(ChannelHandlerContext ctx) { if (webSocketConnection != null && !(webSocketConnection.closeFrameReceived() || webSocketConnection.closeFrameSent())) { // Notify abnormal closure. - WebSocketMessageImpl webSocketCloseMessage = - new WebSocketCloseMessageImpl(Constants.WEBSOCKET_STATUS_CODE_ABNORMAL_CLOSURE); + DefaultWebSocketMessage webSocketCloseMessage = + new DefaultWebSocketCloseMessage(Constants.WEBSOCKET_STATUS_CODE_ABNORMAL_CLOSURE); setupCommonProperties(webSocketCloseMessage, ctx); connectorListener.onMessage((WebSocketCloseMessage) webSocketCloseMessage); } } @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleStateEvent.ALL_IDLE_STATE_EVENT.state()) { @@ -157,6 +165,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) Channel ch = ctx.channel(); if (!handshaker.isHandshakeComplete()) { FullHttpResponse fullHttpResponse = (FullHttpResponse) msg; + httpCarbonResponse = setUpCarbonMessage(ctx, fullHttpResponse); handshaker.finishHandshake(ch, fullHttpResponse); log.debug("WebSocket Client connected!"); handshakeFuture.setSuccess(); @@ -209,17 +218,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) } } - private void notifyTextMessage(WebSocketFrame frame, String text, boolean finalFragment, ChannelHandlerContext ctx) - throws ServerConnectorException { - WebSocketMessageImpl webSocketTextMessage = WebSocketUtil.getWebSocketMessage(frame, text, finalFragment); + private void notifyTextMessage(WebSocketFrame frame, String text, boolean finalFragment, + ChannelHandlerContext ctx) { + DefaultWebSocketMessage webSocketTextMessage = WebSocketUtil.getWebSocketMessage(frame, text, finalFragment); setupCommonProperties(webSocketTextMessage, ctx); connectorListener.onMessage((WebSocketTextMessage) webSocketTextMessage); } private void notifyBinaryMessage(WebSocketFrame frame, ByteBuf content, boolean finalFragment, - ChannelHandlerContext ctx) - throws ServerConnectorException { - WebSocketMessageImpl webSocketBinaryMessage = WebSocketUtil.getWebSocketMessage(frame, content, finalFragment); + ChannelHandlerContext ctx) { + DefaultWebSocketMessage webSocketBinaryMessage = WebSocketUtil.getWebSocketMessage(frame, content, + finalFragment); setupCommonProperties(webSocketBinaryMessage, ctx); connectorListener.onMessage((WebSocketBinaryMessage) webSocketBinaryMessage); } @@ -232,7 +241,7 @@ private void notifyCloseMessage(CloseWebSocketFrame closeWebSocketFrame, Channel throw new ServerConnectorException("Cannot find initialized channel session"); } if (closeCountDownLatch == null) { - WebSocketMessageImpl webSocketCloseMessage = new WebSocketCloseMessageImpl(statusCode, reasonText); + DefaultWebSocketMessage webSocketCloseMessage = new DefaultWebSocketCloseMessage(statusCode, reasonText); setupCommonProperties(webSocketCloseMessage, ctx); connectorListener.onMessage((WebSocketCloseMessage) webSocketCloseMessage); closeFrameReceived = true; @@ -243,31 +252,29 @@ private void notifyCloseMessage(CloseWebSocketFrame closeWebSocketFrame, Channel closeWebSocketFrame.release(); } - private void notifyPingMessage(PingWebSocketFrame pingWebSocketFrame, ChannelHandlerContext ctx) - throws ServerConnectorException { + private void notifyPingMessage(PingWebSocketFrame pingWebSocketFrame, ChannelHandlerContext ctx) { WebSocketControlMessage webSocketControlMessage = WebSocketUtil. getWebsocketControlMessage(pingWebSocketFrame, WebSocketControlSignal.PING); - setupCommonProperties((WebSocketMessageImpl) webSocketControlMessage, ctx); + setupCommonProperties((DefaultWebSocketMessage) webSocketControlMessage, ctx); connectorListener.onMessage(webSocketControlMessage); } - private void notifyPongMessage(PongWebSocketFrame pongWebSocketFrame, ChannelHandlerContext ctx) - throws ServerConnectorException { + private void notifyPongMessage(PongWebSocketFrame pongWebSocketFrame, ChannelHandlerContext ctx) { WebSocketControlMessage webSocketControlMessage = WebSocketUtil. getWebsocketControlMessage(pongWebSocketFrame, WebSocketControlSignal.PONG); - setupCommonProperties((WebSocketMessageImpl) webSocketControlMessage, ctx); + setupCommonProperties((DefaultWebSocketMessage) webSocketControlMessage, ctx); connectorListener.onMessage(webSocketControlMessage); } - private void notifyIdleTimeout(ChannelHandlerContext ctx) throws ServerConnectorException { - WebSocketMessageImpl websocketControlMessage = - new WebSocketControlMessageImpl(WebSocketControlSignal.IDLE_TIMEOUT, null); + private void notifyIdleTimeout(ChannelHandlerContext ctx) { + DefaultWebSocketMessage websocketControlMessage = + new DefaultWebSocketControlMessage(WebSocketControlSignal.IDLE_TIMEOUT, null); setupCommonProperties(websocketControlMessage, ctx); connectorListener.onIdleTimeout((WebSocketControlMessage) websocketControlMessage); } - private void setupCommonProperties(WebSocketMessageImpl webSocketChannelContext, - ChannelHandlerContext ctx) { + private void setupCommonProperties(DefaultWebSocketMessage webSocketChannelContext, + ChannelHandlerContext ctx) { webSocketChannelContext.setSubProtocol(actualSubProtocol); webSocketChannelContext.setIsConnectionSecured(isSecure); webSocketChannelContext.setWebSocketConnection(webSocketConnection); @@ -291,4 +298,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); connectorListener.onError(cause); } + + private HttpCarbonResponse setUpCarbonMessage(ChannelHandlerContext ctx, HttpResponse msg) { + HttpCarbonResponse carbonResponse = new HttpCarbonResponse(msg, new DefaultListener(ctx)); + carbonResponse.setProperty(Constants.DIRECTION, Constants.DIRECTION_RESPONSE); + carbonResponse.setProperty(Constants.HTTP_STATUS_CODE, msg.status().code()); + return carbonResponse; + } } diff --git a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/HttpToWsProtocolSwitchWebSocketListener.java b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/HttpToWsProtocolSwitchWebSocketListener.java index 94e3d8581..0b0880a59 100644 --- a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/HttpToWsProtocolSwitchWebSocketListener.java +++ b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/HttpToWsProtocolSwitchWebSocketListener.java @@ -27,6 +27,7 @@ import org.wso2.transport.http.netty.contract.websocket.WebSocketControlMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketInitMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketTextMessage; +import org.wso2.transport.http.netty.message.HttpCarbonRequest; /** * WebSocket connector listener for the Protocol switch from HTTP to WebSocket test case. @@ -37,9 +38,10 @@ public class HttpToWsProtocolSwitchWebSocketListener implements WebSocketConnect @Override public void onMessage(WebSocketInitMessage initMessage) { - if ("handshake".equals(initMessage.getHeader("Command"))) { + HttpCarbonRequest request = initMessage.getHttpCarbonRequest(); + if ("handshake".equals(request.getHeader("Command"))) { initMessage.handshake(); - } else if ("fail".equals(initMessage.getHeader("Command"))) { + } else if ("fail".equals(request.getHeader("Command"))) { initMessage.cancelHandshake(404, "Not Found"); } } diff --git a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketClientTestCase.java b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketClientTestCase.java index 04aaeae95..cfdae6715 100644 --- a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketClientTestCase.java +++ b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketClientTestCase.java @@ -25,16 +25,16 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import org.wso2.transport.http.netty.contract.ClientConnectorException; import org.wso2.transport.http.netty.contract.ServerConnectorException; -import org.wso2.transport.http.netty.contract.websocket.HandshakeFuture; -import org.wso2.transport.http.netty.contract.websocket.HandshakeListener; +import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeFuture; +import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeListener; import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector; import org.wso2.transport.http.netty.contract.websocket.WebSocketCloseMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection; import org.wso2.transport.http.netty.contract.websocket.WebSocketConnectorListener; import org.wso2.transport.http.netty.contract.websocket.WsClientConnectorConfig; import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory; +import org.wso2.transport.http.netty.message.HttpCarbonResponse; import org.wso2.transport.http.netty.util.TestUtil; import org.wso2.transport.http.netty.util.server.websocket.WebSocketRemoteServer; @@ -62,7 +62,7 @@ public class WebSocketClientTestCase { "xml, json"); @BeforeClass - public void setup() throws InterruptedException, ClientConnectorException { + public void setup() throws InterruptedException { remoteServer.run(); clientConnector = httpConnectorFactory.createWsClientConnector(configuration); } @@ -72,17 +72,17 @@ public void testTextReceived() throws Throwable { CountDownLatch latch = new CountDownLatch(1); String textSent = "testText"; WebSocketTestClientConnectorListener connectorListener = new WebSocketTestClientConnectorListener(latch); - HandshakeFuture handshakeFuture = handshake(connectorListener); - handshakeFuture.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture handshakeFuture = handshake(connectorListener); + handshakeFuture.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { webSocketConnection.pushText(textSent); } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); @@ -97,17 +97,17 @@ public void testBinaryReceived() throws Throwable { byte[] bytes = {1, 2, 3, 4, 5}; ByteBuffer bufferSent = ByteBuffer.wrap(bytes); WebSocketTestClientConnectorListener connectorListener = new WebSocketTestClientConnectorListener(latch); - HandshakeFuture handshakeFuture = handshake(connectorListener); - handshakeFuture.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture handshakeFuture = handshake(connectorListener); + handshakeFuture.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { webSocketConnection.pushBinary(bufferSent); } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); @@ -122,17 +122,17 @@ public void testPingPong() throws Throwable { CountDownLatch pingLatch = new CountDownLatch(1); WebSocketTestClientConnectorListener pingConnectorListener = new WebSocketTestClientConnectorListener(pingLatch); - HandshakeFuture pingHandshakeFuture = handshake(pingConnectorListener); - pingHandshakeFuture.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture pingHandshakeFuture = handshake(pingConnectorListener); + pingHandshakeFuture.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { webSocketConnection.pushText(PING); } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); pingLatch.await(latchWaitTimeInSeconds, TimeUnit.SECONDS); @@ -142,19 +142,19 @@ public void onError(Throwable t) { CountDownLatch pongLatch = new CountDownLatch(1); WebSocketTestClientConnectorListener pongConnectorListener = new WebSocketTestClientConnectorListener(pongLatch); - HandshakeFuture pongHandshakeFuture = handshake(pongConnectorListener); - pongHandshakeFuture.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture pongHandshakeFuture = handshake(pongConnectorListener); + pongHandshakeFuture.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { byte[] bytes = {1, 2, 3, 4, 5}; ByteBuffer buffer = ByteBuffer.wrap(bytes); webSocketConnection.ping(buffer); } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); pongLatch.await(latchWaitTimeInSeconds, TimeUnit.SECONDS); @@ -166,17 +166,17 @@ public void testMultipleClients() throws Throwable { CountDownLatch latch1 = new CountDownLatch(1); WebSocketTestClientConnectorListener connectorListener1 = new WebSocketTestClientConnectorListener(latch1); String[] textsSent = {"testText1", "testText2"}; - HandshakeFuture handshakeFuture1 = handshake(connectorListener1); - handshakeFuture1.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture handshakeFuture1 = handshake(connectorListener1); + handshakeFuture1.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { webSocketConnection.pushText(textsSent[0]); } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); @@ -185,26 +185,26 @@ public void onError(Throwable t) { CountDownLatch latch2 = new CountDownLatch(2); WebSocketTestClientConnectorListener connectorListener2 = new WebSocketTestClientConnectorListener(latch2); - HandshakeFuture handshakeFuture2 = handshake(connectorListener2); - handshakeFuture2.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture handshakeFuture2 = handshake(connectorListener2); + handshakeFuture2.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { - for (int i = 0; i < textsSent.length; i++) { - webSocketConnection.pushText(textsSent[i]); + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { + for (String aTextsSent : textsSent) { + webSocketConnection.pushText(aTextsSent); } } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); latch2.await(latchWaitTimeInSeconds, TimeUnit.SECONDS); - for (int i = 0; i < textsSent.length; i++) { - Assert.assertEquals(connectorListener2.getReceivedTextToClient(), textsSent[i]); + for (String aTextsSent : textsSent) { + Assert.assertEquals(connectorListener2.getReceivedTextToClient(), aTextsSent); } } @@ -214,16 +214,16 @@ public void testIdleTimeout() throws Throwable { clientConnector = httpConnectorFactory.createWsClientConnector(configuration); CountDownLatch latch = new CountDownLatch(1); WebSocketTestClientConnectorListener connectorListener = new WebSocketTestClientConnectorListener(latch); - HandshakeFuture handshakeFuture = handshake(connectorListener); - handshakeFuture.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture handshakeFuture = handshake(connectorListener); + handshakeFuture.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); @@ -239,18 +239,18 @@ public void testSubProtocolNegotiationSuccessful() throws InterruptedException { CountDownLatch latchSuccess = new CountDownLatch(1); WebSocketTestClientConnectorListener connectorListenerSuccess = new WebSocketTestClientConnectorListener(latchSuccess); - HandshakeFuture handshakeFutureSuccess = handshake(connectorListenerSuccess); - handshakeFutureSuccess.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture handshakeFutureSuccess = handshake(connectorListenerSuccess); + handshakeFutureSuccess.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { Assert.assertEquals(webSocketConnection.getSession().getNegotiatedSubprotocol(), "json"); latchSuccess.countDown(); } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, "Handshake failed: " + t.getMessage()); + Assert.fail("Handshake failed: " + t.getMessage()); latchSuccess.countDown(); } }); @@ -265,16 +265,16 @@ public void testSubProtocolNegotiationFail() throws InterruptedException { CountDownLatch latchFail = new CountDownLatch(1); WebSocketTestClientConnectorListener connectorListenerFail = new WebSocketTestClientConnectorListener(latchFail); - HandshakeFuture handshakeFutureFail = handshake(connectorListenerFail); - handshakeFutureFail.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture handshakeFutureFail = handshake(connectorListenerFail); + handshakeFutureFail.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { - Assert.assertFalse(true, "Should not negotiate"); + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { + Assert.fail("Should not negotiate"); latchFail.countDown(); } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); Assert.assertTrue(true, "Handshake failed: " + t.getMessage()); latchFail.countDown(); @@ -288,17 +288,17 @@ public void testConnectionClosureFromServerSide() throws Throwable { CountDownLatch latch = new CountDownLatch(1); String closeText = "close"; WebSocketTestClientConnectorListener connectorListener = new WebSocketTestClientConnectorListener(latch); - HandshakeFuture handshakeFuture = handshake(connectorListener); - handshakeFuture.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture handshakeFuture = handshake(connectorListener); + handshakeFuture.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { webSocketConnection.pushText(closeText); } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); @@ -314,24 +314,24 @@ public void testConnectionClosureFromServerSideWithoutCloseFrame() throws Throwa CountDownLatch latch = new CountDownLatch(1); String closeText = "close-without-frame"; WebSocketTestClientConnectorListener connectorListener = new WebSocketTestClientConnectorListener(latch); - HandshakeFuture handshakeFuture = handshake(connectorListener); - handshakeFuture.setHandshakeListener(new HandshakeListener() { + ClientHandshakeFuture handshakeFuture = handshake(connectorListener); + handshakeFuture.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { webSocketConnection.pushText(closeText); } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); latch.await(latchWaitTimeInSeconds, TimeUnit.SECONDS); WebSocketCloseMessage closeMessage = connectorListener.getCloseMessage(); Assert.assertEquals(closeMessage.getCloseCode(), 1006); - Assert.assertEquals(closeMessage.getCloseReason(), null); + Assert.assertNull(closeMessage.getCloseReason()); Assert.assertTrue(connectorListener.isClosed()); } @@ -343,19 +343,19 @@ public void autoReadFalseTest() throws Throwable { configuration = new WsClientConnectorConfig(url); configuration.setAutoRead(false); clientConnector = httpConnectorFactory.createWsClientConnector(configuration); - HandshakeFuture handshakeFuture = handshake(connectorListener); + ClientHandshakeFuture handshakeFuture = handshake(connectorListener); AtomicReference wsConnection = new AtomicReference<>(); - handshakeFuture.setHandshakeListener(new HandshakeListener() { + handshakeFuture.setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection webSocketConnection) { + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) { webSocketConnection.pushText(textSent); wsConnection.set(webSocketConnection); } @Override - public void onError(Throwable t) { + public void onError(Throwable t, HttpCarbonResponse response) { log.error(t.getMessage()); - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); @@ -363,11 +363,11 @@ public void onError(Throwable t) { String textReceived = null; try { textReceived = connectorListener.getReceivedTextToClient(); - Assert.assertTrue(false, "Expected exception"); + Assert.fail("Expected exception"); } catch (NoSuchElementException ex) { Assert.assertTrue(true, "Expected exception thrown"); } - Assert.assertEquals(textReceived, null); + Assert.assertNull(textReceived); latch = new CountDownLatch(1); connectorListener.setCountDownLatch(latch); wsConnection.get().readNextFrame(); @@ -381,7 +381,7 @@ public void cleanUp() throws ServerConnectorException, InterruptedException { remoteServer.stop(); } - private HandshakeFuture handshake(WebSocketConnectorListener connectorListener) { + private ClientHandshakeFuture handshake(WebSocketConnectorListener connectorListener) { return clientConnector.connect(connectorListener); } } diff --git a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketMessagePropertiesConnectorListener.java b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketMessagePropertiesConnectorListener.java index b8c132b6e..cec72e7e1 100644 --- a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketMessagePropertiesConnectorListener.java +++ b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketMessagePropertiesConnectorListener.java @@ -22,7 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.wso2.transport.http.netty.contract.websocket.HandshakeListener; +import org.wso2.transport.http.netty.contract.websocket.ServerHandshakeListener; import org.wso2.transport.http.netty.contract.websocket.WebSocketBinaryMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketCloseMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection; @@ -30,6 +30,7 @@ import org.wso2.transport.http.netty.contract.websocket.WebSocketControlMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketInitMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketTextMessage; +import org.wso2.transport.http.netty.message.HttpCarbonRequest; /** * WebSocket connector listener to identify the properties of a message. @@ -43,14 +44,14 @@ public void onMessage(WebSocketInitMessage initMessage) { // Assert properties Assert.assertFalse(initMessage.isConnectionSecured()); Assert.assertEquals(initMessage.getTarget(), "/test"); - + HttpCarbonRequest request = initMessage.getHttpCarbonRequest(); // Assert custom headers - String checkSubProtocol = initMessage.getHeader("check-sub-protocol"); - Assert.assertEquals(initMessage.getHeader("message-type"), "websocket"); - Assert.assertEquals(initMessage.getHeader("message-sender"), "wso2"); + String checkSubProtocol = request.getHeader("check-sub-protocol"); + Assert.assertEquals(request.getHeader("message-type"), "websocket"); + Assert.assertEquals(request.getHeader("message-sender"), "wso2"); if ("true".equals(checkSubProtocol)) { String[] subProtocols = {"xml"}; - initMessage.handshake(subProtocols, true).setHandshakeListener(new HandshakeListener() { + initMessage.handshake(subProtocols, true).setHandshakeListener(new ServerHandshakeListener() { @Override public void onSuccess(WebSocketConnection webSocketConnection) { webSocketConnection.startReadingFrames(); @@ -58,7 +59,7 @@ public void onSuccess(WebSocketConnection webSocketConnection) { @Override public void onError(Throwable t) { - Assert.assertTrue(false, t.getMessage()); + Assert.fail(t.getMessage()); } }); } @@ -71,10 +72,6 @@ public void onMessage(WebSocketTextMessage textMessage) { Assert.assertEquals(textMessage.getSubProtocol(), "xml"); Assert.assertFalse(textMessage.isConnectionSecured()); Assert.assertEquals(textMessage.getTarget(), "/test"); - - // Assert custom headers - Assert.assertEquals(textMessage.getHeader("message-type"), "websocket"); - Assert.assertEquals(textMessage.getHeader("message-sender"), "wso2"); } @Override diff --git a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketPassthroughServerConnectorListener.java b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketPassthroughServerConnectorListener.java index 01cf369f4..4c564e699 100644 --- a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketPassthroughServerConnectorListener.java +++ b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketPassthroughServerConnectorListener.java @@ -23,8 +23,9 @@ import org.slf4j.LoggerFactory; import org.testng.Assert; import org.wso2.transport.http.netty.contract.HttpWsConnectorFactory; -import org.wso2.transport.http.netty.contract.websocket.HandshakeFuture; -import org.wso2.transport.http.netty.contract.websocket.HandshakeListener; +import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeListener; +import org.wso2.transport.http.netty.contract.websocket.ServerHandshakeFuture; +import org.wso2.transport.http.netty.contract.websocket.ServerHandshakeListener; import org.wso2.transport.http.netty.contract.websocket.WebSocketBinaryMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector; import org.wso2.transport.http.netty.contract.websocket.WebSocketCloseMessage; @@ -35,6 +36,7 @@ import org.wso2.transport.http.netty.contract.websocket.WebSocketTextMessage; import org.wso2.transport.http.netty.contract.websocket.WsClientConnectorConfig; import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory; +import org.wso2.transport.http.netty.message.HttpCarbonResponse; import org.wso2.transport.http.netty.util.TestUtil; /** @@ -54,11 +56,11 @@ public void onMessage(WebSocketInitMessage initMessage) { configuration.setAutoRead(false); WebSocketClientConnector clientConnector = connectorFactory.createWsClientConnector(configuration); WebSocketConnectorListener clientConnectorListener = new WebSocketPassthroughClientConnectorListener(); - clientConnector.connect(clientConnectorListener).setHandshakeListener(new HandshakeListener() { + clientConnector.connect(clientConnectorListener).setClientHandshakeListener(new ClientHandshakeListener() { @Override - public void onSuccess(WebSocketConnection clientWebSocketConnection) { - HandshakeFuture serverFuture = initMessage.handshake(); - serverFuture.setHandshakeListener(new HandshakeListener() { + public void onSuccess(WebSocketConnection clientWebSocketConnection, HttpCarbonResponse response) { + ServerHandshakeFuture serverFuture = initMessage.handshake(); + serverFuture.setHandshakeListener(new ServerHandshakeListener() { @Override public void onSuccess(WebSocketConnection serverWebSocketConnection) { WebSocketPassThroughTestConnectionManager.getInstance(). @@ -70,14 +72,14 @@ public void onSuccess(WebSocketConnection serverWebSocketConnection) { @Override public void onError(Throwable t) { logger.error(t.getMessage()); - Assert.assertTrue(false, "Error: " + t.getMessage()); + Assert.fail("Error: " + t.getMessage()); } }); } @Override - public void onError(Throwable t) { - Assert.assertTrue(false, t.getMessage()); + public void onError(Throwable t, HttpCarbonResponse response) { + Assert.fail(t.getMessage()); } }); } diff --git a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketTestServerConnectorListener.java b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketTestServerConnectorListener.java index 1e75f9842..024e40a27 100644 --- a/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketTestServerConnectorListener.java +++ b/components/org.wso2.transport.http.netty/src/test/java/org/wso2/transport/http/netty/websocket/WebSocketTestServerConnectorListener.java @@ -22,8 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.wso2.transport.http.netty.contract.websocket.HandshakeFuture; -import org.wso2.transport.http.netty.contract.websocket.HandshakeListener; +import org.wso2.transport.http.netty.contract.websocket.ServerHandshakeFuture; +import org.wso2.transport.http.netty.contract.websocket.ServerHandshakeListener; import org.wso2.transport.http.netty.contract.websocket.WebSocketBinaryMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketCloseMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection; @@ -56,9 +56,9 @@ public WebSocketTestServerConnectorListener() { @Override public void onMessage(WebSocketInitMessage initMessage) { - HandshakeFuture future = initMessage.handshake(null, true, 3000); + ServerHandshakeFuture future = initMessage.handshake(null, true, 3000); CountDownLatch countDownLatch = new CountDownLatch(1); - future.setHandshakeListener(new HandshakeListener() { + future.setHandshakeListener(new ServerHandshakeListener() { @Override public void onSuccess(WebSocketConnection webSocketConnection) { connectionList.forEach( @@ -71,7 +71,7 @@ public void onSuccess(WebSocketConnection webSocketConnection) { @Override public void onError(Throwable throwable) { log.error(throwable.getMessage()); - Assert.assertTrue(false, "Error: " + throwable.getMessage()); + Assert.fail("Error: " + throwable.getMessage()); countDownLatch.countDown(); } }); @@ -108,8 +108,8 @@ public void onMessage(WebSocketControlMessage controlMessage) { WebSocketConnection webSocketConnection = controlMessage.getWebSocketConnection(); webSocketConnection.pong(controlMessage.getPayload()).addListener(future -> { if (!future.isSuccess()) { - Assert.assertTrue(false, "Could not send the message. " - + future.cause().getMessage()); + Assert.fail("Could not send the message. " + + future.cause().getMessage()); } }); }