Skip to content

Commit

Permalink
Fix/conn pool not remove (#289)
Browse files Browse the repository at this point in the history
* fix conn pool not remove

* dev test

* update version
  • Loading branch information
chuailiwu authored Apr 11, 2022
1 parent 0d47a70 commit 8fba4ab
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
24 changes: 13 additions & 11 deletions src/main/java/com/alipay/remoting/DefaultConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -51,48 +52,48 @@
public class DefaultConnectionManager extends AbstractLifeCycle implements ConnectionManager,
Scannable, LifeCycle {

private static final Logger logger = BoltLoggerFactory
.getLogger("CommonDefault");
private static final Logger logger = BoltLoggerFactory
.getLogger("CommonDefault");

/**
* executor to create connections in async way
*/
private ThreadPoolExecutor asyncCreateConnectionExecutor;
private ThreadPoolExecutor asyncCreateConnectionExecutor;

/**
* connection pool initialize tasks
*/
protected ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>> connTasks;
protected ConcurrentMap<String, RunStateRecordedFutureTask<ConnectionPool>> connTasks;

/**
* heal connection tasks
*/
protected ConcurrentHashMap<String, FutureTask<Integer>> healTasks;
protected ConcurrentMap<String, FutureTask<Integer>> healTasks;

/**
* connection pool select strategy
*/
protected ConnectionSelectStrategy connectionSelectStrategy;
protected ConnectionSelectStrategy connectionSelectStrategy;

/**
* address parser
*/
protected RemotingAddressParser addressParser;
protected RemotingAddressParser addressParser;

/**
* connection factory
*/
protected ConnectionFactory connectionFactory;
protected ConnectionFactory connectionFactory;

/**
* connection event handler
*/
protected ConnectionEventHandler connectionEventHandler;
protected ConnectionEventHandler connectionEventHandler;

/**
* connection event listener
*/
protected ConnectionEventListener connectionEventListener;
protected ConnectionEventListener connectionEventListener;

/**
* Construct with parameters.
Expand Down Expand Up @@ -710,6 +711,7 @@ public ConnectionPool call() throws Exception {
syncCreateNumWhenNotWarmup);
} catch (Exception e) {
pool.removeAllAndTryClose();
connTasks.remove(url.getUniqueKey());
throw e;
}
}
Expand Down Expand Up @@ -914,7 +916,7 @@ public void setConnectionEventListener(ConnectionEventListener connectionEventLi
*
* @return property value of connPools
*/
public ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>> getConnPools() {
public ConcurrentMap<String, RunStateRecordedFutureTask<ConnectionPool>> getConnPools() {
return this.connTasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.alipay.remoting.config.BoltServerOption;
import com.alipay.remoting.config.Configuration;
import com.alipay.remoting.ExtendedNettyChannelHandler;
import com.alipay.remoting.exception.RemotingException;
import org.slf4j.Logger;

import com.alipay.remoting.Connection;
Expand Down Expand Up @@ -189,6 +190,8 @@ public Connection createConnection(Url url) throws Exception {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
} else {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
throw new RemotingException("create connection, but channel is inactive, url is "
+ url.getOriginUrl());
}
return conn;
}
Expand All @@ -204,6 +207,9 @@ public Connection createConnection(String targetIP, int targetPort, int connectT
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
} else {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
throw new RemotingException(
"create connection, but channel is inactive, target address is " + targetIP + ":"
+ targetPort);
}
return conn;
}
Expand All @@ -219,6 +225,9 @@ public Connection createConnection(String targetIP, int targetPort, byte version
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
} else {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
throw new RemotingException(
"create connection, but channel is inactive, target address is " + targetIP + ":"
+ targetPort);
}
return conn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Concurrent hash set.
Expand All @@ -27,7 +28,7 @@
* @version $Id: ConcurrentHashSet.java, v 0.1 Mar 11, 2016 3:40:41 PM yunliang.shi Exp $
*/
public class ConcurrentHashSet<E> extends AbstractSet<E> {
private ConcurrentHashMap<E, Boolean> map;
private ConcurrentMap<E, Boolean> map;

/**
* constructor
Expand Down

0 comments on commit 8fba4ab

Please sign in to comment.