WebFlux is a responsive Web application framework provided by Spring 5. It is completely non-blocking and can run on non-blocking servers such as Netty, Undertow, and Servlet 3.1+. This article introduces the use of WebFlux.

FluxWeb vs noFluxWeb

WebFlux is completely non-blocking. Before FluxWeb, we could use DeferredResult and AsyncRestTemplate for non-blocking Web communication. Let’s compare the two.

Note: The performance differences between synchronous blocking and asynchronous non-blocking are not covered in this article. Blocking is waste. We are non-blocking by asynchrony. Asynchrony improves performance only if there is blocking. If there is no blocking, using asynchrony can degrade performance due to overhead such as thread scheduling.

The following example simulates a business scenario. Order service provides an interface to search for order information. At the same time, the realization of this interface also needs to call warehouse service to query warehouse information, and commodity service to query commodity information, filter and take the first five commodity data.

OrderService provides the following methods

public void getOrderByRest(DeferredResult<Order> rs, long orderId) {
    / / [1]
    Order order = mockOrder(orderId);
    / / [2]
    ListenableFuture<ResponseEntity<User>> userLister = asyncRestTemplate.getForEntity("http://user-service/user/mock/" + 1, User.class);
    ListenableFuture<ResponseEntity<List<Goods>>> goodsLister =
                    asyncRestTemplate.exchange("http://goods-service/goods/mock/list? ids=" + StringUtils.join(order.getGoodsIds(), ","),
                            HttpMethod.GET,  null.new ParameterizedTypeReference<List<Goods>>(){});
    / / [3]
    CompletableFuture<ResponseEntity<User>> userFuture = userLister.completable().exceptionally(err -> {
        logger.warn("get user err", err);
        return new ResponseEntity(new User(), HttpStatus.OK);
    });
    CompletableFuture<ResponseEntity<List<Goods>>> goodsFuture = goodsLister.completable().exceptionally(err -> {
        logger.warn("get goods err", err);
        return new ResponseEntity(new ArrayList<>(), HttpStatus.OK);
    });
    / / [4]
    warehouseFuture.thenCombineAsync(goodsFuture, (warehouseRes, goodsRes)-> {
            order.setWarehouse(warehouseRes.getBody());
            List<Goods> goods = goodsRes.getBody().stream()
                    .filter(g -> g.getPrice() > 10).limit(5)
                    .collect(Collectors.toList());
            order.setGoods(goods);
        return order;
    }).whenCompleteAsync((o, err)-> {
        / / [5]
        if(err ! =null) {
            logger.warn("err happen:", err);
        }
        rs.setResult(o);
    });
}
Copy the code
  1. Load order data, mack a data here.
  2. Get warehouse and product information via asyncRestTemplate to get ListenableFuture.
  3. Set ListenableFuture exception handling to avoid interface failure due to a request error.
  4. Consolidate warehouse, product request results, assemble order data
  5. Set the interface to return data through DeferredResult.

As you can see, the code is cumbersome, and the way we return data through DeferredResult is very different from the way our synchronous interface returns data through methods that return values.

There are actually two non-blocking places

  1. The AsyncRestTemplate implementation sends asynchronous Http requests, that is, calls the warehouse service and product service through other threads and returns CompletableFuture, so it does not block the getOrderByRest method thread.
  2. DeferredResult is responsible for asynchronously returning Http responses.

The getOrderByRest method does not block waiting for AsyncRestTemplate to return. Instead, the getOrderByRest method returns the data directly. After AsyncRestTemplate returns, the callback method sets the value of DeferredResult to return the data to Http

ResponseEntity<Warehouse> warehouseRes = warehouseFuture.get();
ResponseEntity<List<Goods>> goodsRes = goodsFuture.get();
order.setWarehouse(warehouseRes.getBody());
order.setGoods(goodsRes.getBody());
return order;
Copy the code

Next we use WebFlux implementation. Pom introduces dependencies

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
Copy the code

The service startup class OrderServiceReactive

@EnableDiscoveryClient
@SpringBootApplication
public class OrderServiceReactive
{
    public static void main( String[] args )
    {
        newSpringApplicationBuilder( OrderServiceReactive.class) .web(WebApplicationType.REACTIVE).run(args); }}Copy the code

Start WebFlux WebApplicationType. REACTIVE.

The OrderController implementation is as follows

@GetMapping("/{id}")
public Mono<Order> getById(@PathVariable long id) {
    return service.getOrder(id);
}
Copy the code

Note that Mono data is returned. Mono and Flux are asynchronous data streams provided by Spring Reactor. Mono is usually used in WebFlux. Flux is used as data input and output value. When the interface returns Mono, Flux, Spring knows that this is the result of an asynchronous request. For information about the Spring Reactor, see Understanding Reactor Design and Implementation.

The OrderService implementation is as follows

public Mono<Order> getOrder(long orderId) {
    / / [1]
    Mono<Order> orderMono = mockOrder(orderId);
    / / [2]
    return orderMono.flatMap(o -> {
        / / [3]
        Mono<User> userMono =  getMono("http://user-service/user/mock/" + o.getUserId(), User.class).onErrorReturn(new User());
        Flux<Goods> goodsFlux = getFlux("http://goods-service/goods/mock/list? ids=" +
                StringUtils.join(o.getGoodsIds(), ","), Goods.class)
                .filter(g -> g.getPrice() > 10)
                .take(5)
                .onErrorReturn(new Goods());
        / / [4]
        return userMono.zipWith(goodsFlux.collectList(), (u, gs) -> {
            o.setUser(u);
            o.setGoods(gs);
            return o;
        });
    });
}

private <T> Mono<T> getMono(String url, Class<T> resType) {
    return webClient.get().uri(url).retrieve().bodyToMono(resType);
}

// getFlux
Copy the code
  1. Load order data, mock a Mono data here
  2. The flatMap method converts Mono data to a type, where the result is Order.
  3. Obtain warehouse and product data. Here you can see that for product filtering, the operation of taking the first five can be directly added to Flux.
  4. The zipWith method can combine two MonOS and return a new Mono type, combining warehouse, product data, and finally returning Mono.

As you can see, the code is much cleaner and the interface returns Mono, similar to how we synchronized direct data in the interface, without the need for a utility class like DeferredResult.

We make an asynchronous request through WebClient, and the WebClient returns the Mono result. Although it’s not really data (it’s a data publisher, and it doesn’t send the data until the request comes back), we can add logic to it through operator methods, such as filtering, sorting, composing, As if the data had been retrieved during the synchronization operation. In AsyncRestTemplate, all logic is written to the callback function.

WebFlux is completely non-blocking. The combined functions of Mono and Flux are very useful. In the above method, the order data is obtained first, and then the warehouse and product data are obtained at the same time. If the interface parameter is passed the order ID, warehouse ID, and product ID, we can also obtain the three data at the same time, and then assemble

public Mono<Order> getOrder(long orderId, long warehouseId, List<Long> goodsIds) {
    Mono<Order> orderMono = mockOrderMono(orderId);

    return orderMono.zipWith(getMono("http://warehouse-service/warehouse/mock/" + warehouseId, Warehouse.class), (o,w) -> {
        o.setWarehouse(w);
        return o;
    }).zipWith(getFlux("http://goods-service/goods/mock/list? ids=" +
            StringUtils.join(goodsIds, ","), Goods.class)
            .filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> {
        o.setGoods(gs);
        return o;
    });
}
Copy the code

If we need to obtain order, warehouse and commodity data sequentially, the implementation is as follows

public Mono<Order> getOrderInLabel(long orderId) {
    Mono<Order> orderMono = mockOrderMono(orderId);

    return orderMono.zipWhen(o -> getMono("http://warehouse-service/warehouse/mock/" + o.getWarehouseId(), Warehouse.class), (o, w) -> {
        o.setWarehouse(w);
        return o;
    }).zipWhen(o -> getFlux("http://goods-service/goods/mock/list? ids=" +
                    StringUtils.join(o.getGoodsIds(), ",") + "&label=" + o.getWarehouse().getLabel() , Goods.class)
            .filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> {
        o.setGoods(gs);
        return o;
    });
}
Copy the code

The zipWith method requests both Mono data to be merged simultaneously, while the zipWhen method blocks until the first Mono data arrives before requesting the second Mono data. orderMono.zipWhen(…) .zipWhen(…) , the first zipWhen method blocks and waits for orderMono data to return before using order data to construct new Mono data. The second zipWhen method also waits for the Mono data built by the previous zipWhen method to return before building a new Mono. Therefore, in the second zipWhen method, You can call O.getwarehouse ().getLabel() because the first zipWhen has retrieved the warehouse information.

Here is a use of WebFlux. It is divided into two parts, WebFlux server and WebClient.

WebFlux server

Underlying container Switch

By default, WebFlux uses Netty to implement asynchronous communication on the server. You can switch the underlying container by changing the dependency package

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
    <exclusion>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-netty</artifactId>
    </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
Copy the code

annotations

WebFlux supports most of SpringMvc annotations, such as mapping: @Controller, @getMapping, @postMapping, @putMapping, @deletemapping parameter binding: PatchMapping, @requestParam, @requestBody, @requestheader, @pathVariable, @requestAttribute, @sessionAttribute result parsing: @responseBody, @modelAttribute These annotations are used in the same way as springMvc

Imperative mapping

WebFlux supports the use of imperative programming to specify mapping relationships

@Bean
public RouterFunction<ServerResponse> monoRouterFunction(InvoiceHandler invoiceHandler) {
    return route()
            .GET("/invoice/{orderId}",  accept(APPLICATION_JSON), invoiceHandler::get)
            .build();
}
Copy the code

Call “/invoice/{orderId}” and the request is forwarded to the invoiceHandler#get method

The invoiceHandler#get method is implemented as follows

public Mono<ServerResponse> get(ServerRequest request) {
    Invoice invoice = new Invoice();
    invoice.setId(999L);
    invoice.setOrderId(Long.parseLong(request.pathVariable("orderId")));
    return ok().contentType(APPLICATION_JSON).body(Mono.just(invoice), Warehouse.class);
}
Copy the code

Filter

Filters can be added by implementing the WebFilter interface

@Component
public class TokenCheckFilter implements WebFilter {
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        if(! exchange.getRequest().getHeaders().containsKey("token")) {
            ServerHttpResponse response =  exchange.getResponse();
            response.setStatusCode(HttpStatus.FORBIDDEN);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
            return response.writeWith(Mono.just(response.bufferFactory().wrap("{\"msg\":\"no token\"}".getBytes())));
        } else {
            exchange.getAttributes().put("auth"."true");
            returnchain.filter(exchange); }}}Copy the code

The above implementation is a pre-filter that checks the request token before invoking the logical method

The code to implement the post-filter is as follows

@Component
public class LogFilter  implements WebFilter {
    private static final Logger logger = LoggerFactory.getLogger(LogFilter.class);
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        / / [1]
        logger.info("request before, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());
        return chain.filter(exchange)
            .doFinally(s -> {
                / / [2]
                logger.info("request after, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode()); }); }}Copy the code

Note that exchange.getresponse () at [1] returns an initialized response, not a response returned after the request was processed.

Exception handling

Define a global ExceptionHandler with the @exceptionhandler annotation

@ControllerAdvice
public class ErrorController {
    private static final Logger logger = LoggerFactory.getLogger(ErrorController.class);

    @ResponseBody
    @ExceptionHandler({NullPointerException.class})
    @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
    public String nullException(NullPointerException e) {
        logger.error("global err handler", e);
        return "{\"msg\":\"There is a problem\"}"; }}Copy the code

WebFluxConfigurer

You can use WebFluxConfigurer to perform custom configurations in WebFlux, such as configuring custom result resolution

@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {
    public void configureArgumentResolvers(ArgumentResolverConfigurer configurer) {
        configurer.addCustomResolver(new HandlerMethodArgumentResolver() {
            ...
        });
    }

    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        configurer.customCodecs().register(newHttpMessageWriter() { ... }); }}Copy the code

ConfigureArgumentResolvers method configuration parameter binding processor configureHttpMessageCodecs configure the Http request message, a response message parser

@ EnableWebFlux asked Spring from WebFluxConfigurationSupport introducing Spring WebFlux configuration. If spring-boot-starter-webFlux is introduced in your dependency, Spring WebFlux will be automatically configured without the need to add this annotations. But if you only use Spring WebFlux without Spring Boot, it is necessary to add @enableWebFlux to start Spring WebFlux automation configuration.

Spring Flux supports CORS, Spring Security, and HTTP/2. For more information, please refer to the official documentation.

WebClient

WebClient can send asynchronous Web requests and supports responsive programming. Here is a WebClient use.

The underlying framework

WebClient’s underlying Netty implements asynchronous Http requests, and we can switch the underlying library, such as Jetty

@Bean
public JettyResourceFactory resourceFactory() {
    return new JettyResourceFactory();
}

@Bean
public WebClient webClient() {
    HttpClient httpClient = HttpClient.create();
    ClientHttpConnector connector =
            new JettyClientHttpConnector(httpClient, resourceFactory());
    return WebClient.builder().clientConnector(connector).build();
}
Copy the code

The connection pool

WebClient creates one connection per request by default. We can configure connection pooling to reuse connections to improve performance.

ConnectionProvider provider = ConnectionProvider.builder("order")
    .maxConnections(100)
    .maxIdleTime(Duration.ofSeconds(30))
    .pendingAcquireTimeout(Duration.ofMillis(100))  
    .build();
return WebClient
    .builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create(provider)));
Copy the code

MaxConnections: Maximum number of allowed connections pendingAcquireTimeout: Maximum time to wait for a connection when no connection is available maxIdleTime: maximum idle time for a connection

timeout

When the underlying network uses Netty, you can set the timeout period as follows

import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

HttpClient httpClient = HttpClient.create()
        .doOnConnected(conn -> conn
                .addHandlerLast(new ReadTimeoutHandler(10))
                .addHandlerLast(new WriteTimeoutHandler(10)));
Copy the code

Or just use responseTimeout

HttpClient httpClient = HttpClient.create()
        .responseTimeout(Duration.ofSeconds(2));
Copy the code
Post Json

WebClient can send JSON, form, file and other request messages. See the most common Post JSON request

webClient.post().uri("http://localhost:9004/order/")
    .contentType(MediaType.APPLICATION_JSON)
    .body(Mono.just(order), Order.class)
    .retrieve().bodyToMono(String.class)
Copy the code

Exception handling

Exception handling can be specified in ResponseSpec

private <T> Mono<T> getMono(String url, Class<T> resType) { return webClient .get().uri(url).retrieve() .onStatus(HttpStatus::is5xxServerError, clientResponse -> { return Mono.error(...) ; }) .onStatus(HttpStatus::is4xxClientError, clientResponse -> { return Mono.error(...) ; }) .onStatus(HttpStatus::isError, clientResponse -> { return Mono.error(...) ; }) .bodyToMono(resType) }Copy the code

It can also be configured on HttpClient

HttpClient httpClient = HttpClient.create()
        .doOnError((req, err) -> {
            log.error("err on request:{}", req.uri(), err);
        }, (res, err) -> {
            log.error("err on response:{}", res.uri(), err);
        })
Copy the code

Synchronized return result

The block method blocks the thread and waits for the request to return

private <T> T syncGetMono(String url, Class<T> resType) {
    return webClient
            .get().uri(url).retrieve()
            .bodyToMono(resType).block();
}
Copy the code

Obtaining response information

ExchangeToMono Can obtain the header, statusCode and other information of the response

private <T> Mono<T> getMonoWithInfo(String url, Class<T> resType) {
    return webClient
            .get()
            .uri(url)
            .exchangeToMono(response -> {
                logger.info("request url:{},statusCode:{},headers:{}", url, response.statusCode(), response.headers());
                return response.bodyToMono(resType);
            });
}
Copy the code

Registry with Ribbon

As it turns out, WebClient supports Eureka registry and Ribbon forwarding in the same way as restTemplate. However, @loadBalanced needs to be added to WebClient.Builder

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
    return WebClient.builder();
}
Copy the code

IO /spring-fram… Full article code: gitee.com/binecy/bin-…

In real projects, thread blocking scenarios are often not only Http requests but also Mysql requests, Redis requests, Kafka requests, etc. When getting data from these data sources, most are blocked until the data source returns it. The power of Reactive Spring is that it also supports non-blocking Reactive programming for these data sources. In the next article, we’ll look at a non-blocking responsive programming approach to Redis.

If you think this article is good, welcome to pay attention to my wechat public number, the series of articles continue to update. Your attention is my motivation to persist.