usingModifyRequestBodyGatewayFilterFactory
Read the request parameters
Customize the getRewriteFunction method to get body information
private RewriteFunction<Object, Object> getRewriteFunction(a) {
return (serverWebExchange, body) -> {
// Body is the request body argument
try {
requestBody.set(JSON.toJSONString(body));
} catch (Exception e) {
log.error("Body conversion exception", e);
}
return Mono.just(body);
};
}
Copy the code
Define ModifyRequestBodyGatewayFilterFactory configuration
private ModifyRequestBodyGatewayFilterFactory.Config getConfig(a) {
ModifyRequestBodyGatewayFilterFactory.Config cf = new ModifyRequestBodyGatewayFilterFactory.Config();
cf.setRewriteFunction(Object.class, Object.class, getRewriteFunction());
return cf;
}
Copy the code
Initialize the
@PostConstruct
public void init(a) {
this.delegate = new ModifyRequestBodyGatewayFilterFactory().apply(this.getConfig());
}
Copy the code
Read the response message content
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest serverHttpRequest = exchange.getRequest(); ServerHttpResponse originalResponse = exchange.getResponse(); / / if it is a post request, the request body out, again write HttpMethod method. = serverHttpRequest getMethod (); DataBufferFactory bufferFactory = originalResponse.bufferFactory(); ServerHttpResponseDecorator decorator = new ServerHttpResponseDecorator(originalResponse) { @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { Flux<? extends DataBuffer> fluxBody = Flux.from(body); return super.writeWith(fluxBody.buffer().map(dataBuffers -> { DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); DataBuffer join = dataBufferFactory.join(dataBuffers); byte[] content = new byte[join.readableByteCount()]; join.read(content); Databufferutils.release (join); // Release memory databufferutils.release (join); String responseData = new String(content, StandardCharsets.UTF_8); Log.info (" response: {}", responseData); byte[] uppedContent = new String(responseData.getBytes(), StandardCharsets.UTF_8).getBytes(); return bufferFactory.wrap(uppedContent); })); }}; If (httpmethod.get.equals (method)) {return chain.filter(exchange.mutate().response(decorator).build()); } return delegate.filter(exchange.mutate().response(decorator).build(), chain); }Copy the code
Version of the environment
Spring Boot 2.1.5. RELEASE
Spring Cloud Gateway 2.1.3. RELEASE
The reason why the input stream to HttpServletRequest can only be read once
Let’s start by looking at why the InputStream to HttpServletRequest can only be read once. When we call getInputStream() to get the InputStream, we get an InputStream object, and the actual type is ServletInputStream. It inherits from InputStream.
The read() method of InputStream has a postion inside it that indicates where the current stream was read. The postion moves each time the stream is read, and if it reaches the end of the read, read() returns -1, indicating that it has finished reading. If you want to read it again, you call the reset() method, and position moves to the position where mark was last called. Mark defaults to 0, so you can read it again. The reset() method is called if it has already been overridden, but it is conditional on whether the markSupported() method returns true.
InputStream does not implement reset() by default, and markSupported() also returns false by default
SpringCloudGateway reads the Request parameter solution
Add a global filter, override the getBody method, and pass the wrapped request through the filter chain. When a later filter uses exchange.getrequest ().getBody() to retrieve the body, the overloaded getBody method is called to retrieve the first cached body data. This enables multiple reads of the body.
The order of this filter sets Ordered.HIGHEST_PRECEDENCE, which is the highest precedence filter. The reason why the priority is set so high is that some built-in filters may also read the body, which will result in the error that the body can only be read once in our custom filter:
java.lang.IllegalStateException: Only one connection receive subscriber allowed.
at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:279)
at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:129)
Copy the code
CacheBodyGlobalFilter
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/ * * * to solve the POST Request body problem can only be read once read out * * the original body data using ServerHttpRequestDecorator packaging Request * rewrite getBody () method, Pass the wrapped request through the filter chain * when the filter passes ServerWebExchange#getRequest()#getBody(), * <p> * The HIGHEST_PRECEDENCE filter must be the highest in order to prevent some filters from reading the body * * in advance@author author
* @dateThe 2021-12-07 * {@see <a href="https://www.codenong.com/j5e82a0f451882573a13/"/>}
*/
@Component
public class CacheBodyGlobalFilter implements Ordered.GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
if (exchange.getRequest().getHeaders().getContentType() == null) {
return chain.filter(exchange);
} else {
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody(a) {
returncachedFlux; }};returnchain.filter(exchange.mutate().request(mutatedRequest).build()); }); }}@Override
public int getOrder(a) {
returnOrdered.HIGHEST_PRECEDENCE; }}Copy the code
ReadBodyFilter.java
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/ * * *@author author
* @version 1.0.0
* @DescriptionRead Request Request parameters *@dateThe 2021-10-12 14:34:00 * /
@Slf4j
@Component
public class ReadBodyFilter implements GlobalFilter.Ordered {
@Override
public int getOrder(a) {
// The order must be less than -1 to read the response body content
return -2;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return read(exchange, chain);
}
@SuppressWarnings({"unchecked", "NullableProblems"})
private Mono<Void> read(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest serverHttpRequest = exchange.getRequest();
ServerHttpResponse originalResponse = exchange.getResponse();
// If it is a POST request, fetch the request body and then write
HttpMethod method = serverHttpRequest.getMethod();
// Request parameters. Post retrieves the request body from the request
String requestBodyStr = HttpMethod.POST.equals(method) ? resolveBodyFromRequest(serverHttpRequest) : null;
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {// Resolve the return body fragment transfer
StringBuffer stringBuffer = new StringBuffer();
dataBuffers.forEach(dataBuffer -> {
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
DataBufferUtils.release(dataBuffer);
try {
stringBuffer.append(new String(content, StandardCharsets.UTF_8));
} catch (Exception e) {
log.error("--list.add--error", e); }}); String result = stringBuffer.toString();// Result is the value of response, which can be modified or viewed at will
Map<String, String> urlParams = serverHttpRequest.getQueryParams().toSingleValueMap();
log.info("Request address: [{}] request parameters: GET | {} 】 【 POST: [{} \ n \ n], the response data: [{} \ n \ n]", serverHttpRequest.getURI(), urlParams, requestBodyStr, result);
byte[] uppedContent = new String(result.getBytes(), StandardCharsets.UTF_8).getBytes();
originalResponse.getHeaders().setContentLength(uppedContent.length);
return bufferFactory.wrap(uppedContent);
}));
}
// if body is not a flux. never got there.
return super.writeWith(body); }};return chain.filter(exchange.mutate().response(decoratedResponse).build());
}
/** * The method to get the string from Flux<DataBuffer> **@returnRequest body * /
private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest) {
// Get the request body
Flux<DataBuffer> body = serverHttpRequest.getBody();
AtomicReference<String> bodyRef = new AtomicReference<>();
body.subscribe(buffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
DataBufferUtils.release(buffer);
bodyRef.set(charBuffer.toString());
});
// Get the Request body
returnbodyRef.get(); }}Copy the code
Reference: solution ModifyRequestBodyGatewayFilterFactory principle analysis