background

  • Recently eating the crab of webflux, I noticed that although it is documented to shutdown gracefully, it does not wait for all requests to return before shutting down. If there are outstanding requests (such as the one for Sleep 10S), it will directlyEmpty reply(environment: Spring Boot 2.1.5 with reactor-netty 0.8.8)

2020.02.11 Spring 2.2.4 Update

  • Update to Spring 2.2.4 found that the previous bug has been fixed inreactorResourceFactoryBlock disables the Netty resource pool.
  • But it has been tested and found that the problem has reappeared and reappearedEmtpy reply. It is not expected to complete the business response and then break all links.
  • Because Netty didn’t know much about it and checked a lot of information, it was found that netty’s elegant closing only ensured that there were no uncompleted packets in the socket buff and no channels still running on the worker. Its quite time also just closes and waits for a while before actually killing the main program.
  • So what to do? I found a good idea on the web to add a current request counter to WebFilter and then block it before closing it. The WebFilter bean blocks the reactorResourceFactory shutdown due to bean ordering
  • If the port needs to be disabled (for example, upper-layer TCP heartbeat detection requires no new traffic), the port is calledreactiveWebServerApplicationContextClose the context to the WebServer
  • The specific code is as follows
//inspired by https://github.com/making/demo-graceful-shutdown-webflux/blob/master/src/main/java/com/example/demogracefulshutdownwebfl ux/DrainFilter.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;

import javax.annotation.PreDestroy;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

@Component
@Slf4j
public class DrainDownFilter implements WebFilter {

	private final AtomicInteger inFlight = new AtomicInteger(0);

	private final AtomicBoolean inDraining = new AtomicBoolean(false);

	@Autowired
	ReactiveWebServerApplicationContext reactiveWebServerApplicationContext;

	@Override
	public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {

		return webFilterChain.filter(serverWebExchange).doFirst(inFlight::incrementAndGet)
				.doFinally((x) -> inFlight.decrementAndGet());
	}

	@PreDestroy
	void preDestroy(a) {
		log.warn("Start draining. ({} in-flight requests)".this.inFlight);
		this.inDraining.set(true);
		// Shut down the server
		reactiveWebServerApplicationContext.getWebServer().stop();
		/ / polling 30 s
		for (int i = 0; i < 30; i++) {
			int current = this.inFlight.get();
			if (current <= 0) {
				break;
			}
			try {
				log.warn("Draining... ({} in-flight requests)", current);
				Thread.sleep(1000);
			}
			catch (InterruptedException e) {
				// Thread.currentThread().interrupt();
			}
		}
		log.warn("Good bye. ({} in-flight requests)".this.inFlight); }}Copy the code

Root cause & solution(Expired)

  • Skip the analysis and jump to the conclusion
  • Although Netty does have graceful shutdown, and it does call it when it shuts down, the REACTOR Netty calls it the following way
//reactor.netty.resources.LoopResources#dispose
@Override
default void dispose(a) {
	//noop default
	disposeLater().subscribe();
}
Copy the code
  • Subscribe is called directly, but blocking is not complete, so if spring shuts down later, the process will be directly interrupted, which causes the above problem.
  • So how to solve it?
    • As I am not familiar with webflux, I do not know how to wait for the completion of all the subscribe, so I use a hack method. After launching, I get the internal HttpResources through reflection, and then register a closed hook to call the blocking method directly. Block to release LoopResources, and you’re good
/ * * *@author Lambda.J
 * @version$Id: gracefulshutdown. Java, v 0.1 2019-05-27 */
@Component
public class GracefulShutdown {
    @Autowired
    ReactorResourceFactory reactorResourceFactory;

    LoopResources loopResources;

    / / SpringBoot 2.1.5 reactor.net ty. Resources. LoopResources# dispose the subscribe only not block waiting for close, manually call here, if repaired delete directly behind
    @PostConstruct
    void init(a) throws Exception {
        Field field = TcpResources.class.getDeclaredField("defaultLoops");
        field.setAccessible(true);
        loopResources = (LoopResources) field.get(reactorResourceFactory.getLoopResources());
        field.setAccessible(false);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Graceful: block long to 20s before real shutdown!");
            loopResources.disposeLater().block(Duration.ofSeconds(20)); })); }}Copy the code

Related knowledge

Spring Shutdown process

  1. A close hook is registered on startuporg.springframework.context.support.AbstractApplicationContext#registerShutdownHook
  2. Real Shutdown processorg.springframework.context.support.AbstractApplicationContext#doClose
    1. Closing context
    2. Close the lifecycle bean
    3. Close the singleton bean generated by the beanFactory (NettyServer is closed here)
    4. Close the BeanFactory
    5. Close the DisposableServer
    6. Remove the listener and set the state to Inactive
  3. Tests found that during the bean shutdown, whenreactorServerResourceFactoryIs turned off (org.springframework.http.client.reactive.ReactorResourceFactory#destroy), the port is closed, but requests that have not yet been answered can continue to be answered.
    • So another way to hacker is to override the shutdown logic over here with class replication, and wait a while after you close reactorServer. Don’t use it as a last resort

The resources

  • Netty graceful exit mechanism and principle
  • Spring Boot 2.1.5 Source Code
  • Reactor Netty 0.8.8 Source Code
  • Netty 4.1.36 Source Code