java
spring boot
webmvc
webflux
reactive
aop
reflect
[Java] An asynchronous callback to a synchronous return
introduce
Download function should be more common function, although a project may not appear much, but basically every project will have, and some download function is actually more complicated, not difficult, but trouble.
So combined with the previous download requirements, I wrote a library to simplify the implementation of the download function
Wouldn’t it be convenient if I said that now you can download any object with a single annotation
@Download(source = "classpath:/download/README.txt")
@GetMapping("/classpath")
public void classpath(a) {}@Download
@GetMapping("/file")
public File file(a) {
return new File("/Users/Shared/README.txt");
}
@Download
@GetMapping("/http")
public String http(a) {
return "http://127.0.0.1:8080/concept-download/image.jpg";
}
Copy the code
Doesn’t feel very different? Listen to one download request I’ve had
We have a platform that manages devices, and then each device will have a QR code image with an HTTP address stored in a field
Now we need to export the compressed package of two-dimensional code pictures of all devices, and the image name needs to add.png suffix to the device name. It is not difficult in terms of requirements, but it is really a bit troublesome
- First I need to look up the list of devices
- Then use the QR code address to download the image and write it to a local cache file
- You need to determine whether the cache already exists before downloading
- Concurrent downloads are required to improve performance
- After all the images are downloaded
- Regenerated into a compressed file
- The input and output streams are then written to the response
Seeing that I had implemented nearly 200 lines of code, it was so long and stinky that a single download feature was such a hassle, I wondered if there was an easier way
My requirements were simple, I thought I would just provide the data to download, such as a file path, a file object, a string of text, an HTTP address, or a mashup of all the previous types, or even an instance of one of our custom classes, and I would leave the rest alone
Is the file path a file or a directory? String text needs to be written to a text file first, okay? How can HTTP resources be downloaded locally? How to compress multiple files? How do I end up in the response? I don’t want to spend my time dealing with this
For example, in my current requirement, I just need to return the list of devices, and I don’t have to worry about anything else
@download (filename = "qr code.zip")
@GetMapping("/download")
public List<Device> download(a) {
return deviceService.all();
}
public class Device {
// Device name
private String name;
// Device QR code
// The annotation indicates that the HTTP address is data to be downloaded
@SourceObject
private String qrCodeUrl;
// Annotations indicate the file name
@SourceName
public String getQrCodeName(a) {
return name + ".png";
}
// omit the other attribute methods
}
Copy the code
Specify file names and file addresses by making some annotations (or implementing some interface) in the Device field, saving time and 199 lines of code
If you’re interested, Github has a more detailed introduction, including advanced usage and the overall architecture
Train of thought
Here is the main design idea of this library, as well as the pits encountered in the middle, you can continue to see if you are interested
In fact, based on the idea at the beginning, I think the function is not much complex, so I decided to open the liver
I just never thought it would be more complicated than I thought (that’s a story later)
basis
First of all, the whole library is based on responsive programming, but it is not fully responsive, just Mono
… Strange combination?
Why is it like this? A big reason is that I just reconstructed the InputStream mode into responsive mode due to the need to be compatible with WebMVC and WebFlux, so this combination appears
This was also the biggest pitfall I encountered, as I had already tuned through the entire servlet-based download process and thought about supporting WebFlux
As you all know, in WebMVC, we can get request and response objects through the RequestContextHolder, but in WebFlux, we can inject them in method parameters
@Download(source = "classpath:/download/README.txt")
@GetMapping("/classpath")
public void classpath(ServerHttpResponse response) {}Copy the code
With Spring’s built-in injection capabilities, we could have gotten the input to the response using AOP, but we always felt it was a bit redundant
Is there a way to get rid of unwanted inputs and get the response object at the same time
/** * sets the current request and response. * *@see ReactiveDownloadHolder
*/
public class ReactiveDownloadFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
return chain.filter(exchange)
// Lower versions use subscriberContext.contextWrite(ctx -> ctx.put(ServerHttpRequest.class, request)) .contextWrite(ctx -> ctx.put(ServerHttpResponse.class, response)); }}/** * to get the current request and response. * *@see ReactiveDownloadFilter
*/
public class ReactiveDownloadHolder {
public static Mono<ServerHttpRequest> getRequest(a) {
// Lower versions use subscriberContext
return Mono.deferContextual(contextView -> Mono.just(contextView.get(ServerHttpRequest.class)));
}
public static Mono<ServerHttpResponse> getResponse(a) {
// Lower versions use subscriberContext
returnMono.deferContextual(contextView -> Mono.just(contextView.get(ServerHttpResponse.class))); }}Copy the code
You can get the response object by adding a WebFilter, but the return value is Mono
Can you use mono.block () to block the object? No, because WebFlux is based on Netty’s non-blocking thread, calling this method will throw an exception
So there’s nothing left to do but refactor the previous code in a responsive manner
architecture
Let’s talk about the overall architecture
For a download request, we can break it down into several steps, taking downloading a compressed package of multiple files as an example
-
First we usually get multiple File paths or corresponding File objects
-
These files are then compressed to produce a compressed file
-
Finally, the compressed file is written to the response
But for the requirements I described above, it’s not a file path or object to start with, it’s an HTTP address, and then one more step is required to download the image before compression
We may need to add additional steps anywhere in the current step for a variety of requirements, so I refer to the implementation of the Spring Cloud Gateway interceptor chain
/** * Download the processor. * /
public interface DownloadHandler extends OrderProvider {
/** * Execute processing. * *@param context {@link DownloadContext}
* @param chain {@link DownloadHandlerChain}
*/
Mono<Void> handle(DownloadContext context, DownloadHandlerChain chain);
}
/** * Download the processing chain. * /
public interface DownloadHandlerChain {
/** * schedule the next download handler. * *@param context {@link DownloadContext}
*/
Mono<Void> next(DownloadContext context);
}
Copy the code
Each step can then implement a separate DownloadHandler, which can be added in any combination between steps
Download context
Based on this, a DownloadContext is used throughout the process to facilitate sharing and passing intermediate results between steps
DownloadContext is also provided and a DownloadContextFactory can be used to customize the context
At the same time provides DownloadContextInitializer and DownloadContextDestroyer used in context initialization and destruction of extended their own logic
Download Type support
The types of data we need to download are variable, such as files, HTTP addresses, and custom class instances as I wanted earlier
So I abstracted all the download objects as Source, representing a download Source, so that the file could be implemented as FileSource, HTTP address could be implemented as HttpSource, and then created by matching the corresponding SourceFactory
For example FileSourceFactory can match File and create FileSource, HttpSourceFactory can match http:// prefix and create HttpSource
/ * * * {@linkThe Source} factory. * /
public interface SourceFactory extends OrderProvider {
/** * Whether the raw data objects that need to be downloaded are supported. * *@paramSource The original data object to download *@param context {@link DownloadContext}
* @returnReturns true */ if supported
boolean support(Object source, DownloadContext context);
/** * create. * *@paramSource The original data object to download *@param context {@link DownloadContext}
* @returnCreate a {@link Source}
*/
Source create(Object source, DownloadContext context);
}
Copy the code
So what about the support for our custom class? I mentioned that we can annotate the class or implement a specific interface, so I will use the way I implement annotations to describe it
In fact, the logic is very simple, as long as you can skillfully use reflection is completely no problem, let’s look at the usage
@download (filename = "qr code.zip")
@GetMapping("/download")
public List<Device> download(a) {
return deviceService.all();
}
public class Device {
// Device name
private String name;
// Device QR code
// The annotation indicates that the HTTP address is data to be downloaded
@SourceObject
private String qrCodeUrl;
// Annotations indicate the file name
@SourceName
public String getQrCodeName(a) {
return name + ".png";
}
// omit the other attribute methods
}
Copy the code
First I define an annotation @sourcemodel on the class to indicate that it needs to be resolved. Then I define an annotation @SourceObject on the field (or method) that needs to be downloaded so that we can get the value of that field (or method) via reflection
The corresponding Source can be created based on the currently supported SourceFactory, then the @sourcename is used to specify the name, and the method (or field) value can also be reflected and set to the created Source by reflection
This gives you the flexibility to support any object type
Concurrent load
For network resources such as HTTP, we need to load them concurrently into local memory or cache files to improve our processing efficiency
Of course I could have executed a thread pool directly, but the concurrency requirements and allocation of resources vary from machine to machine, project to project, and even requirement
So I provided SourceLoader to support custom load logic, you can even use part of the thread pool, part of the coroutine, and the rest of the load
/ * * * {@linkSource} loader. * *@see DefaultSourceLoader
* @see SchedulerSourceLoader
*/
public interface SourceLoader {
/** * Perform load. * *@param source {@link Source}
* @param context {@link DownloadContext}
* @returnLoaded {@link Source}
*/
Mono<Source> load(Source source, DownloadContext context);
}
Copy the code
The compression
Once we have finished loading, we can perform Compression. Again, I define a class Compression as an abstraction for the Compression object
Typically, we create a cache file locally and then write the compressed data to the cache file
However, I hate to configure various paths in the configuration file every time, so I support memory compression when compression, of course, if the file is large or simply generate a cache file
There is also a fully customizable SourceCompressor interface for compression formats, so you don’t have a problem implementing a compression protocol yourself
/ * * * {@linkSource} compressor. * *@see ZipSourceCompressor
*/
public interface SourceCompressor extends OrderProvider {
/** * get the compressed format. * *@returnCompressed format */
String getFormat(a);
/** * Checks whether the compression format is supported. * *@paramFormat Compressed format *@param context {@link DownloadContext}
* @returnReturns true */ if supported
default boolean support(String format, DownloadContext context) {
return format.equalsIgnoreCase(getFormat());
}
/** * this method is called to perform compression if the corresponding format is supported. * *@param source {@link Source}
* @param writer {@link DownloadWriter}
* @param context {@link DownloadContext}
* @return {@link Compression}
*/
Compression compress(Source source, DownloadWriter writer, DownloadContext context);
}
Copy the code
In response to write
I’ve abstracted the response into a DownloadResponse, which is primarily compatible with HttpServletResponse and ServerHttpResponse
Here’s how WebMVC and WebFlux write responses
//HttpServletResponse
response.getOutputStream().write(byte b[], int off, int len);
//ServerHttpResponse
response.writeWith(Publisher<? extends DataBuffer> body);
Copy the code
It was compatible with my head pain, but it worked out
/** * hold {@linkServerHttpResponse} {@linkDownloadResponse}, for WebFlux. * /
@Getter
public class ReactiveDownloadResponse implements DownloadResponse {
private final ServerHttpResponse response;
private OutputStream os;
private Mono<Void> mono;
public ReactiveDownloadResponse(ServerHttpResponse response) {
this.response = response;
}
@Override
public Mono<Void> write(Consumer<OutputStream> consumer) {
if (os == null) {
mono = response.writeWith(Flux.create(fluxSink -> {
try {
os = new FluxSinkOutputStream(fluxSink, response);
consumer.accept(os);
} catch(Throwable e) { fluxSink.error(e); }})); }else {
consumer.accept(os);
}
return mono;
}
@SneakyThrows
@Override
public void flush(a) {
if(os ! =null) { os.flush(); }}@AllArgsConstructor
public static class FluxSinkOutputStream extends OutputStream {
private FluxSink<DataBuffer> fluxSink;
private ServerHttpResponse response;
@Override
public void write(byte[] b) throws IOException {
writeSink(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
byte[] bytes = new byte[len];
System.arraycopy(b, off, bytes, 0, len);
writeSink(bytes);
}
@Override
public void write(int b) throws IOException {
writeSink((byte) b);
}
@Override
public void flush(a) {
fluxSink.complete();
}
public void writeSink(byte. bytes) {
DataBuffer buffer = response.bufferFactory().wrap(bytes);
fluxSink.next(buffer);
// There may be a problem here, but there is currently no data that needs to be releasedDataBufferUtils.release(buffer); }}}Copy the code
As long as the last byte[] is written to each other, it can be converted to each other, but it may be a bit more troublesome, requiring interface callbacks
Write FluxSink as an OutputStream, convert byte[] to DataBuffer, call next, and flush to complete. Perfect
Response writing is actually processing input/output streams. Normally, we define a byte[] to cache the read data, so I don’t fix the size of this cache. Instead, DownloadWriter is available to customize input/output streams, including those with specified encoding or Range headers
/** *@linkInputStream} and {@linkOutputStream} writer. * /
public interface DownloadWriter extends OrderProvider {
/** * Whether the writer supports writing. * *@param resource {@link Resource}
* @param range {@link Range}
* @param context {@link DownloadContext}
* @returnReturns true */ if supported
boolean support(Resource resource, Range range, DownloadContext context);
/** * Perform write. * *@param is {@link InputStream}
* @param os {@link OutputStream}
* @param range {@link Range}
* @param charset {@link Charset}
* @paramLength Indicates the total size. The value may be NULL */
default void write(InputStream is, OutputStream os, Range range, Charset charset, Long length) {
write(is, os, range, charset, length, null);
}
/** * Perform write. * *@param is {@link InputStream}
* @param os {@link OutputStream}
* @param range {@link Range}
* @param charset {@link Charset}
* @paramLength Indicates the total size, which may be null *@paramCallback calls back to the current progress and the size of the growth */
void write(InputStream is, OutputStream os, Range range, Charset charset, Long length, Callback callback);
/** * schedule callback. * /
interface Callback {
/** * callback progress. * *@paramCurrent Current value *@paramIncrease */
void onWrite(long current, long increase); }}Copy the code
The event
When I implemented the whole download process, I found that the whole logic was a bit complicated, so I had to find a way to monitor the whole download process
At the beginning, I defined several listeners to call back and forth, but it was not easy to use. Firstly, our entire architecture was designed to be very flexible and extensible, and the defined listener types were few and not easy to expand
When we later added other processes and steps, we had to add new classes of listeners or add methods on top of the original listener class, which was cumbersome
So I came up with the idea of using events to scale more flexibly and defined DownloadEventPublisher for publishing events and DownloadEventListener for listening on events, as well as supporting Spring’s event listening mode
The log
Based on the above event mode, I implemented several download logs on this basis
- Logs for each process
- Load progress updates, compress progress updates, and respond to logs that write progress updates
- Log time spent
These logs print the information of the whole download process in detail and also help me find a lot of bugs
Other pit
Initialization and destruction of the initial context correspond to one step at the beginning and one step at the end respectively, but after I write the response in WebFlux, I find that the destruction of the context does not take place
So I followed the Spring source code and found that the write method returns mono.empty (), which means that the next method is never called after the response has been written, so steps after the response has been written are never called
Finally, context initialization and destruction are separated out, and the destruction method is called at doAfterTerminate
The end of the
This is basically the content, but for the responsive part of the content is not very thorough, and some of the operators are not very good at using, but there are still a lot of advanced use
If you are interested, you can hold a show, and we will update other libraries gradually
Other articles
[Java] An asynchronous callback to a synchronous return