public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.localAddress(9000)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(newSomeSocketClientHandler()); }});// The Clinet client starts the process, ready to trace the CONNECT method
ChannelFuture future = bootstrap.connect("localhost".8888).sync();
future.channel().closeFuture().sync();
} finally {
if(eventLoopGroup ! =null) { eventLoopGroup.shutdownGracefully(); }}}Copy the code
Bootstrap.java
public ChannelFuture connect(InetAddress inetHost, int inetPort) {
return connect(new InetSocketAddress(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
// Data verification
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
public Bootstrap validate(a) {
super.validate();
// There is no core processor, so the work cannot start at all
if (config.handler() == null) {
throw new IllegalStateException("handler not set");
}
return this;
}
/** The required data is verified and the connection service is processed@see #connect()
*/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// Channel creation, initialization, and registration
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if(! regFuture.isSuccess()) {return regFuture;
}
// Resolve the server address and connect
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if(cause ! =null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); }}});returnpromise; }}Copy the code
AbstractBootstrap.java
// Initialize and register
final ChannelFuture initAndRegister(a) {
Channel channel = null;
try {
/ / create parentChannel
// Create a Channel with no arguments using reflection newInstance
channel = channelFactory.newChannel();
// After the object is created, initialize the channel
init(channel);
} catch (Throwable t) {
if(channel ! =null) { // If the condition is true, the channel was created successfully, but there was a problem during initialization
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
// Close the channel forcibly
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// There is a problem with creating a channel
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// Register parentChannel(select eventLoop from group, bind to channel, create and start the thread)
ChannelFuture regFuture = config().group().register(channel);
if(regFuture.cause() ! =null) {
if (channel.isRegistered()) {
channel.close();
} else{ channel.unsafe().closeForcibly(); }}// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
Copy the code
The trace init() method is an abstract class implemented on both the Clinet side and the Server side to handle business startup processes on different ends at startup time. Note that we are now client startup, so we are tracing the init() in bootstrap.java.
Bootstrap.java
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
// Get pipeline
ChannelPipeline p = channel.pipeline();
// Add the ChannelInitializer handler created in Bootstrap to the pipeline
p.addLast(config.handler());
// Initialize the channel using the options in Bootstrap
finalMap<ChannelOption<? >, Object> options = options0();synchronized (options) {
// Set option to the current channel
setChannelOptions(channel, options, logger);
}
// Use attrs in Bootstrap to initialize a channel
finalMap<AttributeKey<? >, Object> attrs = attrs0();synchronized (attrs) {
for(Entry<AttributeKey<? >, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); }}}Copy the code
AbstractBootstrap.java
static void setChannelOptions(Channel channel, Map
, Object> options, InternalLogger logger)
> {
/ / traverse the options
for(Map.Entry<ChannelOption<? >, Object> e: options.entrySet()) {// Initialize the currently traversed option to channelsetChannelOption(channel, e.getKey(), e.getValue(), logger); }}@SuppressWarnings("unchecked")
private static void setChannelOption(Channel channel, ChannelOption
option, Object value, InternalLogger logger) {
try {
// Write option to channel config
if(! channel.config().setOption((ChannelOption<Object>) option, value)) { logger.warn("Unknown channel option '{}' for channel '{}'", option, channel); }}catch (Throwable t) {
logger.warn(
"Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t); }}Copy the code
That’s it for client initialization and registration, now go back to bootstap.java and continue reading the **doResolveAndConnect()** method.
/** The required data is verified and the connection service is processed@see #connect()
*/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// Channel creation, initialization, and registration
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if(! regFuture.isSuccess()) {return regFuture;
}
// Resolve the server address and connect
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if(cause ! =null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); }}});returnpromise; }}// Resolve the server address and connect
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
// Get the eventLoop bound to the channel
final EventLoop eventLoop = channel.eventLoop();
// Create an address resolver
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
// What if the current address resolver does not support the address format, or the address has already been resolved? Hard even
// If the connection succeeds, it succeeds; if the connection fails, it fails. Success or failure, the results are written into promises
if(! resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {// Resolver has no idea about what to do with the specified remote address or it's resolved already.
doConnect(remoteAddress, localAddress, promise);
return promise;
}
// Handle the case where the address is not resolved and the address format is supported by the parser
// Resolve the address asynchronously
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
// Handle the case where parsing is complete
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
// If an exception occurs during parsing, close the channel, otherwise connect to the resolved address
if(resolveFailureCause ! =null) {
// Failed to resolve immediately
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
// getNow() gets the parse result from the asynchronous result
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
// Wait until the name resolution is finished.
// If parsing is not complete, add listeners to it
resolveFuture.addListener(new FutureListener<SocketAddress>() {
/ / callback
// If an exception occurs during parsing, close the channel, otherwise connect to the resolved address
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if(future.cause() ! =null) {
channel.close();
promise.setFailure(future.cause());
} else{ doConnect(future.getNow(), localAddress, promise); }}}); }catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// The exeuct from the eventLoop of the current Channel connects to the server address asynchronously
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
// If the local address is not configured, the remote connection is made
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
// Prepare to tracechannel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); }}); }Copy the code
Following up on the connection code, abstractchannel.java connect()
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}
@Override
public final ChannelFuture connect( SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
// Use the endpoints in pipeline for connection processing
return tail.connect(remoteAddress, localAddress, promise);
}
Copy the code
AbstractChannelHandlerContext.java
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// Prepare to trace
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run(a) {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
// Prepare to trace
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch(Throwable t) { notifyOutboundHandlerException(t, promise); }}else{ connect(remoteAddress, localAddress, promise); }}Copy the code
DefultChannelPipeline.java
@Override
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
Copy the code
AbstractNioChannel.java
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
}
try {
if(connectPromise ! =null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
// Prepare to trace
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run(a) {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if(connectPromise ! =null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if(connectTimeoutFuture ! =null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null; close(voidPromise()); }}}); }}catch(Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); }}Copy the code
NioSocketChannel.java
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
// Bind the localAddress specified in bootstrap
if(localAddress ! =null) {
doBind0(localAddress);
}
boolean success = false;
try {
// Connect directly to the specified server address, which may or may not succeed at the first time
// If it fails, a connection-ready event for the channel occurs, setting the stage for the next selector selection
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if(! connected) {// If the connection is not successful, it specifies the event it cares about as connection-ready
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if(! success) { doClose(); }}}private void doBind0(SocketAddress localAddress) throws Exception {
// If the JDK version is greater than or equal to 7, use this method to connect
if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else{ SocketUtils.bind(javaChannel().socket(), localAddress); }}//end!!!
Copy the code