1. Sample program:

Reactor – Netty version:

<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty</artifactId>
    <version>0.8.10. RELEASE</version>
</dependency>
Copy the code

Sample program:

public class TcpServerApplication {
    public static void main(String[] args) {
        DisposableServer server = TcpServer
                .create()
                .host("127.0.0.1")
                .port(8080) .handle((inbound, outbound) -> inbound.receive().asString().log().then() ) .bindNow(); server.onDispose() .block(); }}public class TcpClientApplication {
    public static void main(String[] args) throws InterruptedException {
        TcpClient client = TcpClient.create()		// 1 TcpClientConnect
                .host("127.0.0.1")			// 2 TcpClientBootstrap
                .port(8080)			// 3 TcpClientBootstrap
                .handle((inbound, outbound) -> outbound.sendString(Mono.just("Hello World!")).then());				// 4 TcpClientDoOn
        client.connectNow();		// 5 Connection
        Thread.sleep(3000); }}Copy the code

TcpServerApplication Output:

[ INFO] (reactor-tcp-nio-2) onSubscribe(FluxHandle.HandleSubscriber)
[ INFO] (reactor-tcp-nio-2) request(unbounded)
[ INFO] (reactor-tcp-nio-2) onNext(Hello World!)
[ INFO] (reactor-tcp-nio-2) cancel()
Copy the code

The basic logic is that the Server is bound to port 8080 and listens for requests. After connecting to the port, the Client sends the string Hello World! ; The Server port prints the request after receiving it.

The following specific source code analysis.

2, TcpClient

TcpClient.create()

public static TcpClient create(a) {
   return create(TcpResources.get());
}

/** * TcpClientConnect */
public static TcpClient create(ConnectionProvider provider) {
	 return new TcpClientConnect(provider);
}

public class TcpResources implements ConnectionProvider.LoopResources {
  final ConnectionProvider defaultProvider;
	final LoopResources      defaultLoops;

	protected TcpResources(LoopResources defaultLoops, ConnectionProvider defaultProvider) {
		this.defaultLoops = defaultLoops;
		this.defaultProvider = defaultProvider;
	}

  /** * This static method finally returns TcpResources, including: * ConnectionProvider: managing connections * LoopResources: managing threads */
	public static TcpResources get(a) {
    // If not, create TcpResources; Otherwise, return directly to TcpResources
		return getOrCreate(tcpResources, null.null, ON_TCP_NEW,  "tcp");
	}
Copy the code

host()

/** * TcpClientBootstrap * TcpClientBootstrap class has a bootstrapMapper which is a Function: B -> tcputils. updateHost(b, host); When is the apply method of the Function interface executed? You can see that the configure() method of the TcpClientBootstrap class does both, so it's just a matter of when the method is called. * /
public final TcpClient host(String host) {
		Objects.requireNonNull(host, "host");
		return bootstrap(b -> TcpUtils.updateHost(b, host));
}

public final TcpClient bootstrap(Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) {
		return new TcpClientBootstrap(this, bootstrapMapper);
}

final class TcpClientBootstrap extends TcpClientOperator {

	final Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper;

	TcpClientBootstrap(TcpClient client,
			Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) {
		super(client);
		this.bootstrapMapper = Objects.requireNonNull(bootstrapMapper, "bootstrapMapper");
	}

	@Override
	public Bootstrap configure(a) {
		return Objects.requireNonNull(bootstrapMapper.apply(source.configure()), "bootstrapMapper"); }}Copy the code

port()

/** * is similar to the host(String host) method */
public final TcpClient port(int port) {
		return bootstrap(b -> TcpUtils.updatePort(b, port));
}
Copy the code

handler()

/** * return TcpClientDoOn; * The handler entry is BiFunction and the Apply method is called directly in the doOnConnected method; * BiFunction returns Publisher which also calls subscribe directly; * Therefore, you only need to care when the doOnConnected method's input Consumer is called */
public final TcpClient handle(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler) {
		Objects.requireNonNull(handler, "handler");
		return doOnConnected(c -> {
			if (log.isDebugEnabled()) {
				log.debug(format(c.channel(), "Handler is being applied: {}"), handler);
			}

			Mono.fromDirect(handler.apply((NettyInbound) c, (NettyOutbound) c))
			    .subscribe(c.disposeSubscriber());
		});
}

public final TcpClient doOnConnected(Consumer<? super Connection> doOnConnected) {
		Objects.requireNonNull(doOnConnected, "doOnConnected");
		return new TcpClientDoOn(this.null, doOnConnected, null);
}

final class TcpClientDoOn extends TcpClientOperator implements ConnectionObserver {

	final Consumer<? super Bootstrap>  onConnect;
        // onConnected is the doOnConnected Consumer called in the Handle method
	final Consumer<? super Connection> onConnected;
	final Consumer<? super Connection> onDisconnected;

	TcpClientDoOn(TcpClient client,
			@Nullable Consumer<? super Bootstrap> onConnect,
			@Nullable Consumer<? super Connection> onConnected,
			@Nullable Consumer<? super Connection> onDisconnected) {
                // Inherit the previous TcpClient
		super(client);
		this.onConnect = onConnect;
		this.onConnected = onConnected;
		this.onDisconnected = onDisconnected;
	}

	@Override
	public Bootstrap configure(a) {
		Bootstrap b = source.configure();
		ConnectionObserver observer = BootstrapHandlers.connectionObserver(b);
                // Note that ConnectionObserver is set up here, which will be covered later
		BootstrapHandlers.connectionObserver(b, observer.then(this));	
		return b;
	}

	@Override
	public Mono<? extends Connection> connect(Bootstrap b) {
		if(onConnect ! =null) {
			return source.connect(b)
			             .doOnSubscribe(s -> onConnect.accept(b));
		}
		return source.connect(b);
	}

	@Override
	public void onStateChange(Connection connection, State newState) {
                // onConnected is called here when the connection state changes
		if(onConnected ! =null && newState == State.CONFIGURED) {
			onConnected.accept(connection);
			return;
		}
		if(onDisconnected ! =null) {
			if (newState == State.DISCONNECTING) {
				connection.onDispose(() -> onDisconnected.accept(connection));
			}
			else if(newState == State.RELEASED) { onDisconnected.accept(connection); }}}}Copy the code

connectNow()

// Set timeout to 45s
public final Connection connectNow(a) {
		return connectNow(Duration.ofSeconds(45));
}

public final Connection connectNow(Duration timeout) {
    Objects.requireNonNull(timeout, "timeout");
    try {
      	// Connect () returns Mono
        return Objects.requireNonNull(connect().block(timeout), "aborted");
    }
    catch(IllegalStateException e) { ... }}// Return Mono
public final Mono<? extends Connection> connect() {
    ...
    return connect(b);
}

// start the subscription directly in the block method
public T block(Duration timeout) {
    BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
    onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
    return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

final T blockingGet(long timeout, TimeUnit unit) {...if(getCount() ! =0) {
        try {
            if(! await(timeout, unit)) { dispose();// Cancel the subscription over time
                throw new IllegalStateException("Timeout on blocking read for " + timeout + ""+ unit); }}catch (InterruptedException ex) {
            dispose();
            RuntimeException re = Exceptions.propagate(ex);
            //this is ok, as re is always a new non-singleton instance
            re.addSuppressed(new Exception("#block has been interrupted"));
            throwre; }}... }Copy the code

From the above analysis, the actual subscription execution begins in the final connectNow() method. Let’s continue with the CONNECT method.

connect()

public final Mono<? extends Connection> connect() {
    Bootstrap b;
    try {
      	// get the default Bootstrap
        b = configure();
    }
    catch (Throwable t) {
        Exceptions.throwIfJvmFatal(t);
        return Mono.error(t);
    }
    // 2. connect(b)
    return connect(b);
}

public Bootstrap configure(a) {
    return DEFAULT_BOOTSTRAP.clone();
}

static final Bootstrap DEFAULT_BOOTSTRAP =
    new Bootstrap().option(ChannelOption.AUTO_READ, false)             .remoteAddress(InetSocketAddressUtil.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));
Copy the code

Continue with the connect(Bootstrap B) method:

// This is an abstract method that many inherited classes implement. Based on the previous code analysis, the TcpClientDoOn class should be called first
public abstract Mono<? extends Connection> connect(Bootstrap b);

/ / TcpClientDoOn class
public Mono<? extends Connection> connect(Bootstrap b) {
    if(onConnect ! =null) {
        return source.connect(b)
                        .doOnSubscribe(s -> onConnect.accept(b));
    }
    // the source represents the previous TcpClient; It is eventually passed to the original TcpClientConnect
    return source.connect(b);
}

/ / TcpClientConnect class
final ConnectionProvider provider;
public Mono<? extends Connection> connect(Bootstrap b) {
    // Populate the properties of b
    if (b.config()
            .group() == null) { TcpClientRunOn.configure(b, LoopResources.DEFAULT_NATIVE, TcpResources.get(), maxConnections ! = -1);
    }
    // Finally call this method
    return provider.acquire(b);
}
Copy the code

ConnectionProvider

The connect method ends up calling a method in the ConnectionProvider class. ConnectionProvider appeared in the previous analysis, where the TcpResources object returned by the tcpresources.get () method contains this property.

// Create a default TcpResources
static <T extends TcpResources> T create(@Nullable T previous, @Nullable LoopResources loops, @Nullable ConnectionProvider provider, String name, BiFunction
       
         onNew)
       ,> {
		if (previous == null) {
			loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
                        / / create ConnectionProvider
			provider = provider == null ? ConnectionProvider.elastic(name) : provider;
		}
		else {
			loops = loops == null ? previous.defaultLoops : loops;
			provider = provider == null ? previous.defaultProvider : provider;
		}
		returnonNew.apply(loops, provider); }}static ConnectionProvider elastic(String name) {
    // The second input parameter PoolFactory is a functional interface, so the object is generated when poolFactory. newPool is called; The generated ChannelPool type is SimpleChannelPool.
		return new PooledConnectionProvider(name,
				(bootstrap, handler, checker) -> new SimpleChannelPool(bootstrap,
						handler,
						checker,
						true.false));
}

final class PooledConnectionProvider implements ConnectionProvider {

	interface PoolFactory {

		ChannelPool newPool(Bootstrap b, ChannelPoolHandler handler, ChannelHealthChecker checker);
	}

	final ConcurrentMap<PoolKey, Pool> channelPools;
	final String                       name;
	final PoolFactory                  poolFactory;
	final int                          maxConnections;

	PooledConnectionProvider(String name, PoolFactory poolFactory) {
		this.name = name;
		this.poolFactory = poolFactory;
		this.channelPools = PlatformDependent.newConcurrentHashMap();
		this.maxConnections = -1; }... }Copy the code

Now back to the provider.acquire(b) method, we know that the PooledConnectionProvider method was called.

// Map structure, each (remote address, handler) combination has a connection pool
final ConcurrentMap<PoolKey, Pool> channelPools;
final String                       name;
// Create a ChannelPool using poolFactory
final PoolFactory                  poolFactory;
final int                          maxConnections;

Channelpools.get (holder) * If no connection pool exists, create a new connection pool. Last call disposableAcquire(sink, obs, pool, false); * /
public Mono<Connection> acquire(Bootstrap b) {
    return Mono.create(sink -> {
        Bootstrap bootstrap = b.clone();
	// TODO:
        ChannelOperations.OnSetup opsFactory =
                BootstrapHandlers.channelOperationFactory(bootstrap);
	// TODO:Connect life cycle listeners
        ConnectionObserver obs = BootstrapHandlers.connectionObserver(bootstrap);
      	// Bootstrap remote address(IP :port)
        NewConnectionProvider.convertLazyRemoteAddress(bootstrap);
      	// Each (remote address, handler) has a Pool
        ChannelHandler handler = bootstrap.config().handler();
        PoolKey holder = newPoolKey(bootstrap.config().remoteAddress(), handler ! =null ? handler.hashCode() : -1);

        Pool pool;
        for(; ;) {// Get it directly
            pool = channelPools.get(holder);
            if(pool ! =null) {
                break;
            }
            // Create a new connection pool if it does not exist
            pool = new Pool(bootstrap, poolFactory, opsFactory);
            if (channelPools.putIfAbsent(holder, pool) == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Creating new client pool [{}] for {}",
                            name,
                            bootstrap.config()
                                    .remoteAddress());
                }
                break;
            }
            // Disable multiple pools
            pool.close();
        }
        disposableAcquire(sink, obs, pool, false);
    });
}

Pool(Bootstrap bootstrap,
				PoolFactory provider,
				ChannelOperations.OnSetup opsFactory) {
			this.bootstrap = bootstrap;
			this.opsFactory = opsFactory;
  		        Create a new connection pool
			this.pool = provider.newPool(bootstrap, this.this);
			this.defaultGroup = bootstrap.config()
			                             .group();
			HEALTHY = defaultGroup.next()
			                      .newSucceededFuture(true);
			UNHEALTHY = defaultGroup.next()
			                        .newSucceededFuture(false);
}
Copy the code

Continue to disposableAcquire method,

static void disposableAcquire(MonoSink<Connection> sink, ConnectionObserver obs, Pool pool, boolean retried) {
  	        / / for the Channel
		Future<Channel> f = pool.acquire();
		DisposableAcquire disposableAcquire =
				new DisposableAcquire(sink, f, pool, obs, retried);
  	        / / set the listener, this method will eventually call disposableAcquire. OperationComplete () method, operationComplete () method will be called disposableAcquire. The run ()
		f.addListener(disposableAcquire);
		sink.onCancel(disposableAcquire);
	}

final static class DisposableAcquire
			implements Disposable.GenericFutureListener<Future<Channel> >,ConnectionObserver , Runnable {

    final Future<Channel>      f;
    final MonoSink<Connection> sink;
    final Pool                 pool;
    final ConnectionObserver   obs;
    final boolean              retried;

    DisposableAcquire(MonoSink<Connection> sink,
            Future<Channel> future,
            Pool pool,
            ConnectionObserver obs,
            boolean retried) {
        this.f = future;
        this.pool = pool;
        this.sink = sink;
        this.obs = obs;
        this.retried = retried;
    }

    // When the connection state changes, call obs.onStateChange; The obs is set in the tcpClientdoon.configure () method; So whenever the connection state changes, methods in tcpClient.handle are called
    @Override
    public void onStateChange(Connection connection, State newState) {
        if(newState == State.CONFIGURED) { sink.success(connection); } obs.onStateChange(connection, newState); }... }Copy the code

DisposableAcquire is a listener that listens to the connection, namely Future f = Pool.acquire () in the code above. So what type of f is this? We already know from our previous code analysis that the pool is of type SimpleChannelPool.

public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
                             boolean releaseHealthCheck, boolean lastRecentUsed) {
        this.handler = checkNotNull(handler, "handler");
        this.healthCheck = checkNotNull(healthCheck, "healthCheck");
        this.releaseHealthCheck = releaseHealthCheck;
        // Clone the original Bootstrap as we want to set our own handler
        this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
        this.bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                assert ch.eventLoop().inEventLoop();
              	// This method is called when a new connection is createdhandler.channelCreated(ch); }});this.lastRecentUsed = lastRecentUsed; }}public void channelCreated(Channel ch) { inactiveConnections.incrementAndGet(); .The PooledConnection class implements both the Connection and ConnectionObserver interfaces, i.e. a channel and listener. If the state of a channel subsequently changes, PooledConnection's onStateChange method is called.
			PooledConnection pooledConnection = new PooledConnection(ch, this);
			pooledConnection.bind();
			Bootstrap bootstrap = this.bootstrap.clone();
			BootstrapHandlers.finalizeHandler(bootstrap, opsFactory, pooledConnection);
			ch.pipeline()
			  .addFirst(bootstrap.config()
			                     .handler());
}
Copy the code

Let’s continue with PooledConnection’s onStateChange method.

public void onStateChange(Connection connection, State newState) {
		if (newState == State.DISCONNECTING) {
        ...
			}
  	        // Other states go here
		owner().onStateChange(connection, newState);
}

ConnectionObserver owner(a) {
			ConnectionObserver obs;
			for (;;) {
				obs = channel.attr(OWNER)
				             .get();
				if (obs == null) {
					obs = new PendingConnectionObserver();
				}
				else {
					return obs;
				}
                                / / set the channel. The attr (the OWNER) is a newly created PendingConnectionObserver
                                Call again after / / on () method returns the PendingConnectionObserver directly
				if (channel.attr(OWNER)
				           .compareAndSet(null, obs)) {
					returnobs; }}}final static class PendingConnectionObserver implements ConnectionObserver {

		final Queue<Pending> pendingQueue = Queues.<Pending>unbounded(4).get();

		@Override
		public void onUncaughtException(Connection connection, Throwable error) {
			pendingQueue.add(new Pending(connection, error, null));
		}

		@Override
		public void onStateChange(Connection connection, State newState) {
                        // Put state changes on the wait queue and do nothing else
			pendingQueue.add(new Pending(connection, null, newState));
		}

		static class Pending {
			final Connection connection;
			final Throwable error;
			final State state;

			Pending(Connection connection, @Nullable Throwable error, @Nullable State state) {
				this.connection = connection;
				this.error = error;
				this.state = state; }}}Copy the code

As you can see from the above code, a Channel’s state change ends up in a wait queue, missing calls to notify individual listeners. Continuing back to the DisposableAcquire class, you see that the Runnable interface is also implemented.

final static class DisposableAcquire
			implements Disposable.GenericFutureListener<Future<Channel> >,ConnectionObserver , Runnable {

    final Future<Channel>      f;
    final MonoSink<Connection> sink;
    final Pool                 pool;
    final ConnectionObserver   obs;
    final boolean              retried;

    @Override
    public void onStateChange(Connection connection, State newState) {
        if (newState == State.CONFIGURED) {
            sink.success(connection);
        }
        obs.onStateChange(connection, newState);
    }

    @Override
    public void run(a) {
        Channel c = f.getNow();
        pool.activeConnections.incrementAndGet();
        pool.inactiveConnections.decrementAndGet();
	/ / before the owner set up PendingConnectionObserver () method
        ConnectionObserver current = c.attr(OWNER)
                                        .getAndSet(this);

        if (current instanceof PendingConnectionObserver) {
            PendingConnectionObserver pending = (PendingConnectionObserver)current;
            PendingConnectionObserver.Pending p;
            current = null;
            // The listening connection is closed
            registerClose(c, pool);		
	    // Process events in the wait queue in turn (connection state changes)
            while((p = pending.pendingQueue.poll()) ! =null) {
                if(p.error ! =null) {
                    onUncaughtException(p.connection, p.error);
                }
                else if(p.state ! =null) {
                    // Notify each listeneronStateChange(p.connection, p.state); }}}else if (current == null) {
            registerClose(c, pool);
        }
	// TODO:What's going to go this way?
        if(current ! =null) {
            Connection conn = Connection.from(c);
            if (log.isDebugEnabled()) {
                log.debug(format(c, "Channel acquired, now {} active connections and {} inactive connections"),
                        pool.activeConnections, pool.inactiveConnections);
            }
            obs.onStateChange(conn, State.ACQUIRED);

            PooledConnection con = conn.as(PooledConnection.class);
            if(con ! =null) { ChannelOperations<? ,? > ops = pool.opsFactory.create(con, con,null);
                if(ops ! =null) {
                    ops.bind();
                    obs.onStateChange(ops, State.CONFIGURED);
                    sink.success(ops);
                }
                else {
                    //already configured, just forward the connectionsink.success(con); }}else {
                //already bound, just forward the connection
                sink.success(conn);
            }
            return;
        }
        //Connected, leave onStateChange forward the event if factory.if(pool.opsFactory == ChannelOperations.OnSetup.empty()) { sink.success(Connection.from(c)); }}}Copy the code

At this point, a few lines of code from the TcpClient sample are almost done.