1. Sample program:

Reactor – Netty version:

    <version>0.8.10. RELEASE</version>
Sample program:

public class TcpServerApplication {
    public static void main(String[] args) {
        DisposableServer server = TcpServer
                .port(8080) .handle((inbound, outbound) -> inbound.receive().asString().log().then() ) .bindNow(); server.onDispose() .block(); }}public class TcpClientApplication {
    public static void main(String[] args) throws InterruptedException {
        TcpClient client = TcpClient.create()		// 1 TcpClientConnect
                .host("")			// 2 TcpClientBootstrap
                .port(8080)			// 3 TcpClientBootstrap
                .handle((inbound, outbound) -> outbound.sendString(Mono.just("Hello World!")).then());				// 4 TcpClientDoOn
        client.connectNow();		// 5 Connection
TcpServerApplication Output:

[ INFO] (reactor-tcp-nio-2) onSubscribe(FluxHandle.HandleSubscriber)
[ INFO] (reactor-tcp-nio-2) request(unbounded)
[ INFO] (reactor-tcp-nio-2) onNext(Hello World!)
[ INFO] (reactor-tcp-nio-2) cancel()
The basic logic is that the Server is bound to port 8080 and listens for requests. After connecting to the port, the Client sends the string Hello World! ; The Server port prints the request after receiving it.

The following specific source code analysis.

2, TcpClient


public static TcpClient create(a) {
   return create(TcpResources.get());

/** * TcpClientConnect */
public static TcpClient create(ConnectionProvider provider) {
	 return new TcpClientConnect(provider);

public class TcpResources implements ConnectionProvider.LoopResources {
  final ConnectionProvider defaultProvider;
	final LoopResources      defaultLoops;

	protected TcpResources(LoopResources defaultLoops, ConnectionProvider defaultProvider) {
		this.defaultLoops = defaultLoops;
		this.defaultProvider = defaultProvider;

  /** * This static method finally returns TcpResources, including: * ConnectionProvider: managing connections * LoopResources: managing threads */
	public static TcpResources get(a) {
    // If not, create TcpResources; Otherwise, return directly to TcpResources
		return getOrCreate(tcpResources, null.null, ON_TCP_NEW,  "tcp");
/** * TcpClientBootstrap * TcpClientBootstrap class has a bootstrapMapper which is a Function: B -> tcputils. updateHost(b, host); When is the apply method of the Function interface executed? You can see that the configure() method of the TcpClientBootstrap class does both, so it's just a matter of when the method is called. * /
public final TcpClient host(String host) {
		Objects.requireNonNull(host, "host");
		return bootstrap(b -> TcpUtils.updateHost(b, host));

public final TcpClient bootstrap(Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) {
		return new TcpClientBootstrap(this, bootstrapMapper);

final class TcpClientBootstrap extends TcpClientOperator {

	final Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper;

	TcpClientBootstrap(TcpClient client,
			Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) {
		this.bootstrapMapper = Objects.requireNonNull(bootstrapMapper, "bootstrapMapper");

	public Bootstrap configure(a) {
		return Objects.requireNonNull(bootstrapMapper.apply(source.configure()), "bootstrapMapper"); }}Copy the code


/** * is similar to the host(String host) method */
public final TcpClient port(int port) {
		return bootstrap(b -> TcpUtils.updatePort(b, port));
/** * return TcpClientDoOn; * The handler entry is BiFunction and the Apply method is called directly in the doOnConnected method; * BiFunction returns Publisher which also calls subscribe directly; * Therefore, you only need to care when the doOnConnected method's input Consumer is called */
public final TcpClient handle(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler) {
		Objects.requireNonNull(handler, "handler");
		return doOnConnected(c -> {
			if (log.isDebugEnabled()) {
				log.debug(format(c.channel(), "Handler is being applied: {}"), handler);

			Mono.fromDirect(handler.apply((NettyInbound) c, (NettyOutbound) c))

public final TcpClient doOnConnected(Consumer<? super Connection> doOnConnected) {
		Objects.requireNonNull(doOnConnected, "doOnConnected");
		return new TcpClientDoOn(this.null, doOnConnected, null);

final class TcpClientDoOn extends TcpClientOperator implements ConnectionObserver {

	final Consumer<? super Bootstrap>  onConnect;
        // onConnected is the doOnConnected Consumer called in the Handle method
	final Consumer<? super Connection> onConnected;
	final Consumer<? super Connection> onDisconnected;

	TcpClientDoOn(TcpClient client,
			@Nullable Consumer<? super Bootstrap> onConnect,
			@Nullable Consumer<? super Connection> onConnected,
			@Nullable Consumer<? super Connection> onDisconnected) {
                // Inherit the previous TcpClient
		this.onConnect = onConnect;
		this.onConnected = onConnected;
		this.onDisconnected = onDisconnected;

	public Bootstrap configure(a) {
		Bootstrap b = source.configure();
		ConnectionObserver observer = BootstrapHandlers.connectionObserver(b);
                // Note that ConnectionObserver is set up here, which will be covered later
		BootstrapHandlers.connectionObserver(b, observer.then(this));	
		return b;

	public Mono<? extends Connection> connect(Bootstrap b) {
		if(onConnect ! =null) {
			return source.connect(b)
			             .doOnSubscribe(s -> onConnect.accept(b));
		return source.connect(b);

	public void onStateChange(Connection connection, State newState) {
                // onConnected is called here when the connection state changes
		if(onConnected ! =null && newState == State.CONFIGURED) {
		if(onDisconnected ! =null) {
			if (newState == State.DISCONNECTING) {
				connection.onDispose(() -> onDisconnected.accept(connection));
// Set timeout to 45s
public final Connection connectNow(a) {
		return connectNow(Duration.ofSeconds(45));

public final Connection connectNow(Duration timeout) {
    Objects.requireNonNull(timeout, "timeout");
    try {
      	// Connect () returns Mono
        return Objects.requireNonNull(connect().block(timeout), "aborted");
    catch(IllegalStateException e) { ... }}// Return Mono
public final Mono<? extends Connection> connect() {
    return connect(b);

// start the subscription directly in the block method
public T block(Duration timeout) {
    BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
    return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS);

final T blockingGet(long timeout, TimeUnit unit) {...if(getCount() ! =0) {
        try {
            if(! await(timeout, unit)) { dispose();// Cancel the subscription over time
                throw new IllegalStateException("Timeout on blocking read for " + timeout + ""+ unit); }}catch (InterruptedException ex) {
            RuntimeException re = Exceptions.propagate(ex);
            //this is ok, as re is always a new non-singleton instance
            re.addSuppressed(new Exception("#block has been interrupted"));
From the above analysis, the actual subscription execution begins in the final connectNow() method. Let’s continue with the CONNECT method.


public final Mono<? extends Connection> connect() {
    Bootstrap b;
    try {
      	// get the default Bootstrap
        b = configure();
    catch (Throwable t) {
        return Mono.error(t);
    // 2. connect(b)
    return connect(b);

public Bootstrap configure(a) {
    return DEFAULT_BOOTSTRAP.clone();

static final Bootstrap DEFAULT_BOOTSTRAP =
    new Bootstrap().option(ChannelOption.AUTO_READ, false)             .remoteAddress(InetSocketAddressUtil.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));
Continue with the connect(Bootstrap B) method:

// This is an abstract method that many inherited classes implement. Based on the previous code analysis, the TcpClientDoOn class should be called first
public abstract Mono<? extends Connection> connect(Bootstrap b);

/ / TcpClientDoOn class
public Mono<? extends Connection> connect(Bootstrap b) {
    if(onConnect ! =null) {
        return source.connect(b)
                        .doOnSubscribe(s -> onConnect.accept(b));
    // the source represents the previous TcpClient; It is eventually passed to the original TcpClientConnect
    return source.connect(b);

/ / TcpClientConnect class
final ConnectionProvider provider;
public Mono<? extends Connection> connect(Bootstrap b) {
    // Populate the properties of b
    if (b.config()
            .group() == null) { TcpClientRunOn.configure(b, LoopResources.DEFAULT_NATIVE, TcpResources.get(), maxConnections ! = -1);
    // Finally call this method
    return provider.acquire(b);
The connect method ends up calling a method in the ConnectionProvider class. ConnectionProvider appeared in the previous analysis, where the TcpResources object returned by the tcpresources.get () method contains this property.

// Create a default TcpResources
static <T extends TcpResources> T create(@Nullable T previous, @Nullable LoopResources loops, @Nullable ConnectionProvider provider, String name, BiFunction
       ,> {
		if (previous == null) {
			loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
                        / / create ConnectionProvider
			provider = provider == null ? ConnectionProvider.elastic(name) : provider;
		else {
			loops = loops == null ? previous.defaultLoops : loops;
			provider = provider == null ? previous.defaultProvider : provider;
		returnonNew.apply(loops, provider); }}static ConnectionProvider elastic(String name) {
    // The second input parameter PoolFactory is a functional interface, so the object is generated when poolFactory. newPool is called; The generated ChannelPool type is SimpleChannelPool.
		return new PooledConnectionProvider(name,
				(bootstrap, handler, checker) -> new SimpleChannelPool(bootstrap,

final class PooledConnectionProvider implements ConnectionProvider {

	interface PoolFactory {

		ChannelPool newPool(Bootstrap b, ChannelPoolHandler handler, ChannelHealthChecker checker);

	final ConcurrentMap<PoolKey, Pool> channelPools;
	final String                       name;
	final PoolFactory                  poolFactory;
	final int                          maxConnections;

	PooledConnectionProvider(String name, PoolFactory poolFactory) {
		this.name = name;
		this.poolFactory = poolFactory;
		this.channelPools = PlatformDependent.newConcurrentHashMap();
Now back to the provider.acquire(b) method, we know that the PooledConnectionProvider method was called.

// Map structure, each (remote address, handler) combination has a connection pool
final ConcurrentMap<PoolKey, Pool> channelPools;
final String                       name;
// Create a ChannelPool using poolFactory
final PoolFactory                  poolFactory;
final int                          maxConnections;

Channelpools.get (holder) * If no connection pool exists, create a new connection pool. Last call disposableAcquire(sink, obs, pool, false); * /
public Mono<Connection> acquire(Bootstrap b) {
    return Mono.create(sink -> {
        Bootstrap bootstrap = b.clone();
	// TODO:
        ChannelOperations.OnSetup opsFactory =
	// TODO:Connect life cycle listeners
        ConnectionObserver obs = BootstrapHandlers.connectionObserver(bootstrap);
      	// Bootstrap remote address(IP :port)
      	// Each (remote address, handler) has a Pool
        ChannelHandler handler = bootstrap.config().handler();
        PoolKey holder = newPoolKey(bootstrap.config().remoteAddress(), handler ! =null ? handler.hashCode() : -1);

        Pool pool;
        for(; ;) {// Get it directly
            pool = channelPools.get(holder);
            if(pool ! =null) {
            // Create a new connection pool if it does not exist
            pool = new Pool(bootstrap, poolFactory, opsFactory);
            if (channelPools.putIfAbsent(holder, pool) == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Creating new client pool [{}] for {}",
            // Disable multiple pools
        disposableAcquire(sink, obs, pool, false);

Pool(Bootstrap bootstrap,
				PoolFactory provider,
				ChannelOperations.OnSetup opsFactory) {
			this.bootstrap = bootstrap;
			this.opsFactory = opsFactory;
  		        Create a new connection pool
			this.pool = provider.newPool(bootstrap, this.this);
			this.defaultGroup = bootstrap.config()
			HEALTHY = defaultGroup.next()
			UNHEALTHY = defaultGroup.next()
Continue to disposableAcquire method,

static void disposableAcquire(MonoSink<Connection> sink, ConnectionObserver obs, Pool pool, boolean retried) {
  	        / / for the Channel
		Future<Channel> f = pool.acquire();
		DisposableAcquire disposableAcquire =
				new DisposableAcquire(sink, f, pool, obs, retried);
  	        / / set the listener, this method will eventually call disposableAcquire. OperationComplete () method, operationComplete () method will be called disposableAcquire. The run ()

final static class DisposableAcquire
			implements Disposable.GenericFutureListener<Future<Channel> >,ConnectionObserver , Runnable {

    final Future<Channel>      f;
    final MonoSink<Connection> sink;
    final Pool                 pool;
    final ConnectionObserver   obs;
    final boolean              retried;

    DisposableAcquire(MonoSink<Connection> sink,
            Future<Channel> future,
            Pool pool,
            ConnectionObserver obs,
            boolean retried) {
        this.f = future;
        this.pool = pool;
        this.sink = sink;
        this.obs = obs;
        this.retried = retried;

    // When the connection state changes, call obs.onStateChange; The obs is set in the tcpClientdoon.configure () method; So whenever the connection state changes, methods in tcpClient.handle are called
    public void onStateChange(Connection connection, State newState) {
        if(newState == State.CONFIGURED) { sink.success(connection); } obs.onStateChange(connection, newState); }... }Copy the code

DisposableAcquire is a listener that listens to the connection, namely Future f = Pool.acquire () in the code above. So what type of f is this? We already know from our previous code analysis that the pool is of type SimpleChannelPool.

public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
                             boolean releaseHealthCheck, boolean lastRecentUsed) {
        this.handler = checkNotNull(handler, "handler");
        this.healthCheck = checkNotNull(healthCheck, "healthCheck");
        this.releaseHealthCheck = releaseHealthCheck;
        // Clone the original Bootstrap as we want to set our own handler
        this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
        this.bootstrap.handler(new ChannelInitializer<Channel>() {
            protected void initChannel(Channel ch) throws Exception {
                assert ch.eventLoop().inEventLoop();
              	// This method is called when a new connection is createdhandler.channelCreated(ch); }});this.lastRecentUsed = lastRecentUsed; }}public void channelCreated(Channel ch) { inactiveConnections.incrementAndGet(); .The PooledConnection class implements both the Connection and ConnectionObserver interfaces, i.e. a channel and listener. If the state of a channel subsequently changes, PooledConnection's onStateChange method is called.
			PooledConnection pooledConnection = new PooledConnection(ch, this);
			Bootstrap bootstrap = this.bootstrap.clone();
			BootstrapHandlers.finalizeHandler(bootstrap, opsFactory, pooledConnection);
Let’s continue with PooledConnection’s onStateChange method.

public void onStateChange(Connection connection, State newState) {
		if (newState == State.DISCONNECTING) {
  	        // Other states go here
		owner().onStateChange(connection, newState);

ConnectionObserver owner(a) {
			ConnectionObserver obs;
			for (;;) {
				obs = channel.attr(OWNER)
				if (obs == null) {
					obs = new PendingConnectionObserver();
				else {
					return obs;
                                / / set the channel. The attr (the OWNER) is a newly created PendingConnectionObserver
                                Call again after / / on () method returns the PendingConnectionObserver directly
				if (channel.attr(OWNER)
				           .compareAndSet(null, obs)) {
					returnobs; }}}final static class PendingConnectionObserver implements ConnectionObserver {

		final Queue<Pending> pendingQueue = Queues.<Pending>unbounded(4).get();

		public void onUncaughtException(Connection connection, Throwable error) {
			pendingQueue.add(new Pending(connection, error, null));

		public void onStateChange(Connection connection, State newState) {
                        // Put state changes on the wait queue and do nothing else
			pendingQueue.add(new Pending(connection, null, newState));

		static class Pending {
			final Connection connection;
			final Throwable error;
			final State state;

			Pending(Connection connection, @Nullable Throwable error, @Nullable State state) {
				this.connection = connection;
				this.error = error;
As you can see from the above code, a Channel’s state change ends up in a wait queue, missing calls to notify individual listeners. Continuing back to the DisposableAcquire class, you see that the Runnable interface is also implemented.

final static class DisposableAcquire
			implements Disposable.GenericFutureListener<Future<Channel> >,ConnectionObserver , Runnable {

    final Future<Channel>      f;
    final MonoSink<Connection> sink;
    final Pool                 pool;
    final ConnectionObserver   obs;
    final boolean              retried;

    public void onStateChange(Connection connection, State newState) {
        if (newState == State.CONFIGURED) {
        obs.onStateChange(connection, newState);

    public void run(a) {
        Channel c = f.getNow();
	/ / before the owner set up PendingConnectionObserver () method
        ConnectionObserver current = c.attr(OWNER)

        if (current instanceof PendingConnectionObserver) {
            PendingConnectionObserver pending = (PendingConnectionObserver)current;
            PendingConnectionObserver.Pending p;
            current = null;
            // The listening connection is closed
            registerClose(c, pool);		
	    // Process events in the wait queue in turn (connection state changes)
            while((p = pending.pendingQueue.poll()) ! =null) {
                if(p.error ! =null) {
                    onUncaughtException(p.connection, p.error);
                else if(p.state ! =null) {
                    // Notify each listeneronStateChange(p.connection, p.state); }}}else if (current == null) {
            registerClose(c, pool);
	// TODO:What's going to go this way?
        if(current ! =null) {
            Connection conn = Connection.from(c);
            if (log.isDebugEnabled()) {
                log.debug(format(c, "Channel acquired, now {} active connections and {} inactive connections"),
                        pool.activeConnections, pool.inactiveConnections);
            obs.onStateChange(conn, State.ACQUIRED);

            PooledConnection con = conn.as(PooledConnection.class);
            if(con ! =null) { ChannelOperations<? ,? > ops = pool.opsFactory.create(con, con,null);
                if(ops ! =null) {
                    obs.onStateChange(ops, State.CONFIGURED);
                else {
                    //already configured, just forward the connectionsink.success(con); }}else {
                //already bound, just forward the connection
        //Connected, leave onStateChange forward the event if factory.if(pool.opsFactory == ChannelOperations.OnSetup.empty()) { sink.success(Connection.from(c)); }}}Copy the code

At this point, a few lines of code from the TcpClient sample are almost done.