diff --git a/src/main/java/com/alipay/remoting/DefaultConnectionManager.java b/src/main/java/com/alipay/remoting/DefaultConnectionManager.java index 08d501e6..978bbc59 100644 --- a/src/main/java/com/alipay/remoting/DefaultConnectionManager.java +++ b/src/main/java/com/alipay/remoting/DefaultConnectionManager.java @@ -25,6 +25,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; @@ -51,48 +52,48 @@ public class DefaultConnectionManager extends AbstractLifeCycle implements ConnectionManager, Scannable, LifeCycle { - private static final Logger logger = BoltLoggerFactory - .getLogger("CommonDefault"); + private static final Logger logger = BoltLoggerFactory + .getLogger("CommonDefault"); /** * executor to create connections in async way */ - private ThreadPoolExecutor asyncCreateConnectionExecutor; + private ThreadPoolExecutor asyncCreateConnectionExecutor; /** * connection pool initialize tasks */ - protected ConcurrentHashMap> connTasks; + protected ConcurrentMap> connTasks; /** * heal connection tasks */ - protected ConcurrentHashMap> healTasks; + protected ConcurrentMap> healTasks; /** * connection pool select strategy */ - protected ConnectionSelectStrategy connectionSelectStrategy; + protected ConnectionSelectStrategy connectionSelectStrategy; /** * address parser */ - protected RemotingAddressParser addressParser; + protected RemotingAddressParser addressParser; /** * connection factory */ - protected ConnectionFactory connectionFactory; + protected ConnectionFactory connectionFactory; /** * connection event handler */ - protected ConnectionEventHandler connectionEventHandler; + protected ConnectionEventHandler connectionEventHandler; /** * connection event listener */ - protected ConnectionEventListener connectionEventListener; + protected ConnectionEventListener connectionEventListener; /** * Construct with parameters. @@ -710,6 +711,7 @@ public ConnectionPool call() throws Exception { syncCreateNumWhenNotWarmup); } catch (Exception e) { pool.removeAllAndTryClose(); + connTasks.remove(url.getUniqueKey()); throw e; } } @@ -914,7 +916,7 @@ public void setConnectionEventListener(ConnectionEventListener connectionEventLi * * @return property value of connPools */ - public ConcurrentHashMap> getConnPools() { + public ConcurrentMap> getConnPools() { return this.connTasks; } } diff --git a/src/main/java/com/alipay/remoting/connection/AbstractConnectionFactory.java b/src/main/java/com/alipay/remoting/connection/AbstractConnectionFactory.java index f26e4c22..407a6b1f 100644 --- a/src/main/java/com/alipay/remoting/connection/AbstractConnectionFactory.java +++ b/src/main/java/com/alipay/remoting/connection/AbstractConnectionFactory.java @@ -31,6 +31,7 @@ import com.alipay.remoting.config.BoltServerOption; import com.alipay.remoting.config.Configuration; import com.alipay.remoting.ExtendedNettyChannelHandler; +import com.alipay.remoting.exception.RemotingException; import org.slf4j.Logger; import com.alipay.remoting.Connection; @@ -189,6 +190,8 @@ public Connection createConnection(Url url) throws Exception { channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT); } else { channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED); + throw new RemotingException("create connection, but channel is inactive, url is " + + url.getOriginUrl()); } return conn; } @@ -204,6 +207,9 @@ public Connection createConnection(String targetIP, int targetPort, int connectT channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT); } else { channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED); + throw new RemotingException( + "create connection, but channel is inactive, target address is " + targetIP + ":" + + targetPort); } return conn; } @@ -219,6 +225,9 @@ public Connection createConnection(String targetIP, int targetPort, byte version channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT); } else { channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED); + throw new RemotingException( + "create connection, but channel is inactive, target address is " + targetIP + ":" + + targetPort); } return conn; } diff --git a/src/main/java/com/alipay/remoting/util/ConcurrentHashSet.java b/src/main/java/com/alipay/remoting/util/ConcurrentHashSet.java index efe39877..05bfe5d5 100644 --- a/src/main/java/com/alipay/remoting/util/ConcurrentHashSet.java +++ b/src/main/java/com/alipay/remoting/util/ConcurrentHashSet.java @@ -19,6 +19,7 @@ import java.util.AbstractSet; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Concurrent hash set. @@ -27,7 +28,7 @@ * @version $Id: ConcurrentHashSet.java, v 0.1 Mar 11, 2016 3:40:41 PM yunliang.shi Exp $ */ public class ConcurrentHashSet extends AbstractSet { - private ConcurrentHashMap map; + private ConcurrentMap map; /** * constructor