background
Because the project needs to integrate socket connection, the system as a client to external system (Socket server) initiate a request connection; Simple in itself, but with concurrency in mind, NIO’s Netty framework and connection pooling are used.
show code
Pom files depend on Netty
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
Copy the code
Inherit the hanlder file from ChannelPoolHandler
public class NettyChannelPoolHandler implements ChannelPoolHandler {
@Override
public void channelReleased(Channel channel) throws Exception {
System.out.println("channelReleased. Channel ID: " + channel.id());
}
@Override
public void channelAcquired(Channel channel) throws Exception {
System.out.println("channelAcquired. Channel ID: " + channel.id());
}
@Override
public void channelCreated(Channel ch) throws Exception {
System.out.println("channelCreated. Channel ID: " + ch.id());
SocketChannel channel = (SocketChannel) ch;
channel.config().setKeepAlive(true);
channel.config().setTcpNoDelay(true); Channel. Pipeline () // Heartbeat support.addlast (new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS)) .addLast(new LengthFieldPrepender(2)) .addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)) .addLast(new ObjectCodec()) .addLast(new NettyClientHandler("ping-pong")); }}Copy the code
Connection pool
// Client connection pool // singleton mode, initialize poolMap on first connection, Public class NettyPoolClient {private static NettyPoolClient. Public class NettyPoolClient {private static NettyPoolClient instance; privateNettyPoolClient() {
}
public static NettyPoolClient getInstance() {
if (instance == null) {
instance = new NettyPoolClient();
instance.build();
}
return instance;
}
public ChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap;
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap strap = new Bootstrap();
public void build() {// channeloption. TCP_NODELAY Disables the nagle algorithm to send smaller packets and reduce latency // channeloption. SO_KEEPALIVE detects whether the server is active at intervals of about two hours. Group (group).channel(niosocketchannel.class).option(channeloption.tcp_nodelay,true)
.option(ChannelOption.SO_KEEPALIVE, true); poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() { @Override protected SimpleChannelPool newPool(InetSocketAddress key) { //NettyChannelPoolHandler Implement ChannelPoolHandler that overwrites the create channel, get channel, return channel methodsreturnnew FixedChannelPool(strap.remoteAddress(key), new NettyChannelPoolHandler(), 2); }}; }}Copy the code
4. Test classes
// This is actually a controller, Public String connectSocketServer(String host,String Port) {InetSocketAddress InetSocketAddress = new InetSocketAddress(host, Integer.parseInt(port)); // If poolMap does not have the pool, Will put a new pool SimpleChannelPool pool. = NettyPoolClient getInstance () poolMap. Get (inetSocketAddress); // Get channel in pool, write parameter, return channel; Future<Channel> f = pool.acquire(); f.addListener((FutureListener<Channel>) f1 -> {if (f1.isSuccess()) {
Channel ch = f1.getNow();
ch.writeAndFlush("hello socket"); pool.release(ch); }});return "success";
}
Copy the code
5. Start testing
As you can see, the channel ID is the same for both requests, indicating that the connection pool is working
Source code analysis
The above content can meet the needs, but as a technical personnel to have the spirit of exploration, since we can use it, might as well explore how it is achieved.
Process analysis
FixedChannelPool
We’ll start with the pool we created using the FixedChannelPool class. Io.net ty.channel.pool = io.netty.channel.pool = io.netty.channel.pool = io.netty.channel.pool = io.netty.channel.pool = io.netty.channel.pool
public class FixedChannelPool extends SimpleChannelPool
Copy the code
Deja vu? Yes; We use the SimpleChannelPool class as a generic for poolMapvalue; Of course, FixedChannelPool can also be used as a generic, after all, they are a parent-child relationship. Since father and son are related, they must also have the same characteristics:
- They are pools
- They all came true
ChannelPool
, this interface mainly hasacquire()
,release(Channel var1)
,close()
These methods operate on joins; Since they are both pools, the connection must be operated on.
In Java, a subclass inherits non-private attributes and methods from its parent class. A subclass has more functionality than its parent class, so let’s see the difference:
SimpleChannelPool implements basic connection pool functionality. FixedChannelPool has more powerful features such as pool size, timeout duration, etc.
Focus onFixedChannelPool
on
// These are all FixedChannelPool constructors. Public FixedChannelPool(Bootstrap Bootstrap, ChannelPoolHandler handler, int maxConnections) { this(bootstrap, handler, maxConnections, 2147483647); } public FixedChannelPool(Bootstrap bootstrap, ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) { this(bootstrap, handler, ChannelHealthChecker.ACTIVE, (FixedChannelPool.AcquireTimeoutAction)null, -1L, maxConnections, maxPendingAcquires); } public FixedChannelPool(Bootstrap bootstrap, ChannelPoolHandler handler, ChannelHealthChecker healthCheck, FixedChannelPool.AcquireTimeoutAction action, long acquireTimeoutMillis, int maxConnections, int maxPendingAcquires) { this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,true);
}
public FixedChannelPool(Bootstrap bootstrap, ChannelPoolHandler handler, ChannelHealthChecker healthCheck, FixedChannelPool.AcquireTimeoutAction action, long acquireTimeoutMillis, int maxConnections, int maxPendingAcquires, boolean releaseHealthCheck) {
this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, releaseHealthCheck, true); } // Here!! public FixedChannelPool(Bootstrap bootstrap, ChannelPoolHandler handler, ChannelHealthChecker healthCheck, FixedChannelPool.AcquireTimeoutAction action, long acquireTimeoutMillis, int maxConnections, int maxPendingAcquires, boolean releaseHealthCheck, boolean lastRecentUsed) { super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed); this.pendingAcquireQueue = new ArrayDeque();if (maxConnections < 1) {
throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
} else if (maxPendingAcquires < 1) {
throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
} else {
if (action == null && acquireTimeoutMillis == -1L) {
this.timeoutTask = null;
this.acquireTimeoutNanos = -1L;
} else {
if(action == null && acquireTimeoutMillis ! = -1L) { throw new NullPointerException("action");
}
if(action ! = null && acquireTimeoutMillis < 0L) { throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
}
this.acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
switch(action) {
case FAIL:
this.timeoutTask = new FixedChannelPool.TimeoutTask() { public void onTimeout(FixedChannelPool.AcquireTask task) { task.promise.setFailure(FixedChannelPool.TIMEOUT_EXCEPTION); }};break;
case NEW:
this.timeoutTask = new FixedChannelPool.TimeoutTask() { public void onTimeout(FixedChannelPool.AcquireTask task) { task.acquired(); FixedChannelPool.super.acquire(task.promise); }};break; default: throw new Error(); } } this.executor = bootstrap.config().group().next(); this.maxConnections = maxConnections; this.maxPendingAcquires = maxPendingAcquires; }}Copy the code
The last constructor input parameter
- FixedChannelPool AcquireTimeoutAction this is an enumerated type, NEW – there is no available connection, over time, create a NEW; FAIL- When no connection is available, an exception occurs over a period of time
- AcquireTimeoutMillis Specifies the maximum time to connect to the socket pool
- MaxConnections Maximum number of connections
- MaxPendingAcquires This indicates the number of waits that exceed the maximum number of connections
- LastRecentUsed True — fetch connections from the tail of the queue False — fetch connections from the head of the queue
ChannelPoolMap
So this is an interface, and its implementation is AbstractChannelPoolMap and the connection pool map is ConcurrentHashMap, which supports concurrent maps; Take a look at the get() method we are using:
Public final P get(K key) {P pool = (ChannelPool)this.map.get(objectutil.checknotnull (key,"key"));
if (pool == null) {
pool = this.newPool(key);
P old = (ChannelPool)this.map.putIfAbsent(key, pool);
if (old != null) {
pool.close();
pool = old;
}
}
return pool;
}
Copy the code
If you look at this code, does it make you feel less confused? When a new key arrives, if the map is empty, a new connection pool will be created, and then another step will be taken. Eventually the connection pool is returned to us for use.
conclusion
I didn’t expect to write this blog for nearly two hours. My waist is sore and my neck hurts. But we have to have the spirit of asking questions, know why, know why we have to believe that technology can change the world! Good night