usingModifyRequestBodyGatewayFilterFactoryRead the request parameters

Customize the getRewriteFunction method to get body information

private RewriteFunction<Object, Object> getRewriteFunction(a) {
    return (serverWebExchange, body) -> {
        // Body is the request body argument
        try {
            requestBody.set(JSON.toJSONString(body));
        } catch (Exception e) {
            log.error("Body conversion exception", e);
        }
        return Mono.just(body);
    };
}
Copy the code

Define ModifyRequestBodyGatewayFilterFactory configuration

private ModifyRequestBodyGatewayFilterFactory.Config getConfig(a) {
    ModifyRequestBodyGatewayFilterFactory.Config cf = new ModifyRequestBodyGatewayFilterFactory.Config();
    cf.setRewriteFunction(Object.class, Object.class, getRewriteFunction());
    return cf;
}
Copy the code

Initialize the

@PostConstruct
public void init(a) {
    this.delegate = new ModifyRequestBodyGatewayFilterFactory().apply(this.getConfig());
}
Copy the code

Read the response message content

@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest serverHttpRequest = exchange.getRequest(); ServerHttpResponse originalResponse = exchange.getResponse(); / / if it is a post request, the request body out, again write HttpMethod method. = serverHttpRequest getMethod (); DataBufferFactory bufferFactory = originalResponse.bufferFactory(); ServerHttpResponseDecorator decorator = new ServerHttpResponseDecorator(originalResponse) { @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { Flux<? extends DataBuffer> fluxBody = Flux.from(body); return super.writeWith(fluxBody.buffer().map(dataBuffers -> { DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); DataBuffer join = dataBufferFactory.join(dataBuffers); byte[] content = new byte[join.readableByteCount()]; join.read(content); Databufferutils.release (join); // Release memory databufferutils.release (join); String responseData = new String(content, StandardCharsets.UTF_8); Log.info (" response: {}", responseData); byte[] uppedContent = new String(responseData.getBytes(), StandardCharsets.UTF_8).getBytes(); return bufferFactory.wrap(uppedContent); })); }}; If (httpmethod.get.equals (method)) {return chain.filter(exchange.mutate().response(decorator).build()); } return delegate.filter(exchange.mutate().response(decorator).build(), chain); }Copy the code

Version of the environment

Spring Boot 2.1.5. RELEASE

Spring Cloud Gateway 2.1.3. RELEASE

The reason why the input stream to HttpServletRequest can only be read once

Let’s start by looking at why the InputStream to HttpServletRequest can only be read once. When we call getInputStream() to get the InputStream, we get an InputStream object, and the actual type is ServletInputStream. It inherits from InputStream.

The read() method of InputStream has a postion inside it that indicates where the current stream was read. The postion moves each time the stream is read, and if it reaches the end of the read, read() returns -1, indicating that it has finished reading. If you want to read it again, you call the reset() method, and position moves to the position where mark was last called. Mark defaults to 0, so you can read it again. The reset() method is called if it has already been overridden, but it is conditional on whether the markSupported() method returns true.

InputStream does not implement reset() by default, and markSupported() also returns false by default

SpringCloudGateway reads the Request parameter solution

Add a global filter, override the getBody method, and pass the wrapped request through the filter chain. When a later filter uses exchange.getrequest ().getBody() to retrieve the body, the overloaded getBody method is called to retrieve the first cached body data. This enables multiple reads of the body.

The order of this filter sets Ordered.HIGHEST_PRECEDENCE, which is the highest precedence filter. The reason why the priority is set so high is that some built-in filters may also read the body, which will result in the error that the body can only be read once in our custom filter:

java.lang.IllegalStateException: Only one connection receive subscriber allowed.

at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:279)

at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:129)
Copy the code

CacheBodyGlobalFilter


import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/ * * * to solve the POST Request body problem can only be read once read out * * the original body data using ServerHttpRequestDecorator packaging Request * rewrite getBody () method, Pass the wrapped request through the filter chain * when the filter passes ServerWebExchange#getRequest()#getBody(), * <p> * The HIGHEST_PRECEDENCE filter must be the highest in order to prevent some filters from reading the body * * in advance@author author
 * @dateThe 2021-12-07 * {@see <a href="https://www.codenong.com/j5e82a0f451882573a13/"/>}
 */
@Component
public class CacheBodyGlobalFilter implements Ordered.GlobalFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        if (exchange.getRequest().getHeaders().getContentType() == null) {
            return chain.filter(exchange);
        } else {
            return DataBufferUtils.join(exchange.getRequest().getBody())
                    .flatMap(dataBuffer -> {
                        DataBufferUtils.retain(dataBuffer);
                        Flux<DataBuffer> cachedFlux = Flux
                                .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
                        ServerHttpRequest	 mutatedRequest = new ServerHttpRequestDecorator(
                                exchange.getRequest()) {
                            @Override
                            public Flux<DataBuffer> getBody(a) {
                                returncachedFlux; }};returnchain.filter(exchange.mutate().request(mutatedRequest).build()); }); }}@Override
    public int getOrder(a) {
        returnOrdered.HIGHEST_PRECEDENCE; }}Copy the code

ReadBodyFilter.java


import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

/ * * *@author author
 * @version 1.0.0
 * @DescriptionRead Request Request parameters *@dateThe 2021-10-12 14:34:00 * /
@Slf4j
@Component
public class ReadBodyFilter implements GlobalFilter.Ordered {

    @Override
    public int getOrder(a) {
        // The order must be less than -1 to read the response body content
        return -2;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return read(exchange, chain);
    }

    @SuppressWarnings({"unchecked", "NullableProblems"})
    private Mono<Void> read(ServerWebExchange exchange, GatewayFilterChain chain) {

        ServerHttpRequest serverHttpRequest = exchange.getRequest();
        ServerHttpResponse originalResponse = exchange.getResponse();

        // If it is a POST request, fetch the request body and then write
        HttpMethod method = serverHttpRequest.getMethod();
        // Request parameters. Post retrieves the request body from the request
        String requestBodyStr = HttpMethod.POST.equals(method) ? resolveBodyFromRequest(serverHttpRequest) : null;
        DataBufferFactory bufferFactory = originalResponse.bufferFactory();

        ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                if (body instanceof Flux) {
                    Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;

                    return super.writeWith(fluxBody.buffer().map(dataBuffers -> {// Resolve the return body fragment transfer
                        StringBuffer stringBuffer = new StringBuffer();
                        dataBuffers.forEach(dataBuffer -> {
                            byte[] content = new byte[dataBuffer.readableByteCount()];
                            dataBuffer.read(content);
                            DataBufferUtils.release(dataBuffer);
                            try {
                                stringBuffer.append(new String(content, StandardCharsets.UTF_8));
                            } catch (Exception e) {
                                log.error("--list.add--error", e); }}); String result = stringBuffer.toString();// Result is the value of response, which can be modified or viewed at will
                        Map<String, String> urlParams = serverHttpRequest.getQueryParams().toSingleValueMap();

                        log.info("Request address: [{}] request parameters: GET | {} 】 【 POST: [{} \ n \ n], the response data: [{} \ n \ n]", serverHttpRequest.getURI(), urlParams, requestBodyStr, result);

                        byte[] uppedContent = new String(result.getBytes(), StandardCharsets.UTF_8).getBytes();
                        originalResponse.getHeaders().setContentLength(uppedContent.length);
                        return bufferFactory.wrap(uppedContent);
                    }));

                }
                // if body is not a flux. never got there.
                return super.writeWith(body); }};return chain.filter(exchange.mutate().response(decoratedResponse).build());
    }

    /** * The method to get the string from Flux<DataBuffer> **@returnRequest body * /
    private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest) {
        // Get the request body
        Flux<DataBuffer> body = serverHttpRequest.getBody();
        AtomicReference<String> bodyRef = new AtomicReference<>();
        body.subscribe(buffer -> {
            CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
            DataBufferUtils.release(buffer);
            bodyRef.set(charBuffer.toString());
        });
        // Get the Request body
        returnbodyRef.get(); }}Copy the code

Reference: solution ModifyRequestBodyGatewayFilterFactory principle analysis