asyncframework

This is an asynchronous framework implemented by ASM+ dynamic bytecode technology. Just add an @asyncFunction annotation to your interface to make this method execute asynchronously!

The @AsyncFunction annotation of the AsyncFramework is the same as the @Async asynchronous annotation of the Spring framework.

A: Why would you want to implement such A framework when Spring already provides such functionality? Q: Because I needed it when I was wrapping components, but I didn’t want to rely on Spring to use this functionality in the project, which would be bloated. Secondly, it is also because I like to toss about and want to realize my ideas.

The AsyncFramework can replace Spring’s @async usage by packaging a starter package that relies on spring’s BeanPostProcess for seamless integration. I don’t want to make wheels. Asyncframework is recommended for non-Spring projects.

Version update

I’m WeChat public in 2019, published an article, if the interface was introduced a method to add an annotation can make the asynchronous execution of methods, but there was no implementation method can also be executed asynchronously with the return value of function, and on this day in 2020, I have put this function implements, are racking their brains to make.

In addition, if you are also interested in bytecode, I highly recommend you to read the source code of this framework, which is the essence of the concentration, more than a dozen classes contain the use of design patterns, bytecode, and framework design ideas, and can better understand how spring @async is implemented.

When implementing the function of supporting asynchronous execution of methods with return values, there are two big problems: difficulty one: how to realize asynchronous execution of methods with return values? Difficulty 2: How to write bytecode to implement the proxy class of the generic interface?

How do I use asyncFramework?

Step 1: Add dependencies to the project

Maven:

<dependency>
  <groupId>com.github.wujiuye</groupId>
  <artifactId>asyncframework</artifactId>
  <version>1.2.0 - RELEASE</version>
</dependency>
Copy the code

gradle:

implementation 'com. Making. Wujiuye: asyncframework: 1.2.0 - RELEASE'
Copy the code

Step 2: Define the interface and write the implementation class of the interface

A: Why do we need to define interfaces? Q: Because the requirement I met before was to use the interface, in fact, it is easier not to use the write interface, I will add it later when I want to play.

Define the interface:

/ * * *@author wujiuye
 * @version1.0 on 2019/11/24 * /
public interface AsyncMessageSubscribe {

    /** * Asynchronous has no return value **@param queue
     */
    @AsyncFunction
    void pullMessage(String queue);

    /** * Asynchronous with return value **@param s1
     * @param s2
     * @return* /
    @AsyncFunction
    AsyncResult<String> doAction(String s1, String s2);

}
Copy the code

Write the implementation class:

 public class Test{

     /** * interface implementation */
    private AsyncMessageSubscribe impl = new AsyncMessageSubscribe() {
        @Override
        public void pullMessage(String queue) {
            System.out.println(queue + ", current thread name:" + Thread.currentThread().getName());
        }

        @Override
        public AsyncResult<String> doAction(String s1, String s2) {
            System.out.println("s1==>" + s1 + ", s2==>" + s2);
            return new AsyncResult<>("hello wujiuye! current thread name:"+ Thread.currentThread().getName()); }}; }Copy the code

Step 3: Configure the global thread pool

public class Test{
    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
Copy the code

Step 4: Create a proxy object using AsyncProxyFactory

Call AsyncProxyFactory getInterfaceImplSupporAsync method to create a proxy class, you need to specify the asynchronous execution which thread pool, as well as the interface implementation class.

 public class AsmProxyTest {
 
     @Test
     public void testAutoProxyAsync(a) throws Exception {
         ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
         AsyncMessageSubscribe proxy = AsmProxyFactory.getInterfaceImplSupporAsync(AsyncMessageSubscribe.class, impl, executorService);
         Async with no return value
         proxy.pullMessage("wujiuye");
         // Async with return value
         AsyncResult<String> asyncResult = proxy.doAction("sssss"."ddd"); System.out.println(asyncResult.get()); System.in.read(); }}Copy the code

A: I also need to create A proxy class to call. Wouldn’t it be easier if I just created A new Runnable and put it into the thread pool? Q: That’s true, but not if proxy objects are created automatically through a packet scan. Spring is implemented through BeanPostProcess.

Implementation principle of asynchronous band return value

In spring projects, if you want to add @async annotations to a method that returns a value, you need the method to return a value of type AsyncResult. I also looked at the spring source code and found that AsyncResult is a Future, but it cannot be implemented just by relying on the Future. Since AsyncResult is returned when the method is finished executing, how do I get this AsyncResult before the method is executed? And the get method that calls AsyncResult gets the result that the final method returns, okay?

The way I think of it is agent, yes, agent agent after agent. I’m also implementing an AsyncResult, which is also a Future, except that the Future’s get method doesn’t block because it doesn’t need to.

/** * Async method return argument **@author wujiuye 2020/03/27
 */
public class AsyncResult<T> implements Future<T> {

    private T result;

    public AsyncResult(T result) {
        this.result = result;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled(a) {
        return false;
    }

    @Override
    public boolean isDone(a) {
        return true;
    }

    @Override
    public T get(a) throws InterruptedException, ExecutionException {
        return result;
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return get();
    }

    /** ** called by bytecode **@paramFuture is submitted to the thread pool to execute the future * returned@param <T>
     * @return* /
    public static <T> AsyncResult<T> newAsyncResultProxy(final Future<AsyncResult<T>> future) {
        return new AsyncResult<T>(null) {
            @Override
            public T get(a) throws InterruptedException, ExecutionException {
                AsyncResult<T> asyncResult = future.get();
                returnasyncResult.get(); }}; }}Copy the code

The newAsyncResultProxy method, which is invoked on the bytecode-generated proxy object, is the most critical step in the entire asynchronous implementation.

As we know, the Submit method of the ExecutorService supports submitting a task with a Callable return value, and the Submit method returns a Future whose GET method will block until the task completes. So we can proxy AsyncResult’s GET method. When AsyncResult’s GET method is called, it actually calls the Future get method returned by the ExecutorService submit method. The implementation that blocks the result is shielded from the consumer.

As an example, the following test code implements dynamic bytecode, which is how the framework enables asynchronous execution of methods with return values.

public class Test2{

    /** * interface implementation */
    private AsyncMessageSubscribe impl = new AsyncMessageSubscribe() {
        @Override
        public void pullMessage(String queue) {
            System.out.println(queue + ", current thread name:" + Thread.currentThread().getName());
        }

        @Override
        public AsyncResult<String> doAction(String s1, String s2) {
            System.out.println("s1==>" + s1 + ", s2==>" + s2);
            return new AsyncResult<>("hello wujiuye! current thread name:"+ Thread.currentThread().getName()); }};// Callable submitted to thread pool execution
    public static class AsyncMessageSubscribe_doActionCallable implements Callable<AsyncResult<String>> {
        private AsyncMessageSubscribe target;
        private String param1;
        private String param2;

        public AsyncMessageSubscribe_doActionCallable(AsyncMessageSubscribe var1, String var2, String var3) {
            this.target = var1;
            this.param1 = var2;
            this.param2 = var3;
        }

        public AsyncResult<String> call(a) throws Exception {
            return this.target.doAction(this.param1, this.param2); }}@Test
    public void test2(a) throws ExecutionException, InterruptedException {
        AsyncMessageSubscribe_doActionCallable callable = new AsyncMessageSubscribe_doActionCallable(impl, "wujiuye"."hello"); Future result = executorService.submit(callable); AsyncResult<String> asyncResult = AsyncResult.newAsyncResultProxy(result); System.out.println(asyncResult.get()); }}Copy the code

Some considerations for writing bytecode to implement generic interfaces

The code in the test2 method, which corresponds to the bytecode implementation, is as follows. The source code is in the FutureFunctionHandler class.

public class FutureFunctionHandler implements AsyncFunctionHandler{
        /** * asyncMethod returns a value of type Future for processing **@paramClassWriter class overwriter *@paramInterfaceClass interface *@paramAsyncMethod Asynchronous method *@paramProxyObjClass interface implementation class *@paramType of the executorServiceClass thread pool */
        @Override
        public void doOverrideAsyncFunc(ClassWriter classWriter, Class
        interfaceClass, Method asyncMethod, Class
        proxyObjClass, Class
        executorServiceClass) {...// invoke submit callable
            methodVisitor.visitVarInsn(ALOAD, 0);
            methodVisitor.visitFieldInsn(GETFIELD, ByteCodeUtils.getProxyClassName(proxyObjClass), "executorService", Type.getDescriptor(executorServiceClass));
            methodVisitor.visitVarInsn(ALOAD, index);
            if(! executorServiceClass.isInterface()) { methodVisitor.visitMethodInsn(INVOKEVIRTUAL, executorServiceClass.getName().replace("."."/"),
                        "submit", ByteCodeUtils.getFuncDesc(Future.class, Callable.class), false);
            } else {
                methodVisitor.visitMethodInsn(INVOKEINTERFACE, executorServiceClass.getName().replace("."."/"),
                        "submit", ByteCodeUtils.getFuncDesc(Future.class, Callable.class), true);
            }
            // Store the return value on the operand stack
            methodVisitor.visitVarInsn(ASTORE, ++index);

            // Add another layer of proxy to block and wait for the external masking thread
            methodVisitor.visitVarInsn(ALOAD, index);
            methodVisitor.visitMethodInsn(INVOKESTATIC, AsyncResult.class.getName().replace("."."/"),
                    "newAsyncResultProxy", ByteCodeUtils.getFuncDesc(AsyncResult.class, Future.class),
                    false); methodVisitor.visitInsn(ARETURN); . }}Copy the code

When the thread pool calls AsyncMessageSubscribe_doActionCallable the Callable, it looks for the method

java/util/concurrent/Callable.call:()Ljava.lang.Object;
Copy the code

Because Callable is a generic interface.

This would not work if you changed the signature of the implementation class and the signature of the Call method.

Class signature: Ljava/lang/Object; Ljava/util/concurrent/Callable<Lcom/wujiuye/asyncframework/handler/async/AsyncResult<Ljava/lang/String; >; >;" Call: () method's signature Lcom/wujiuye asyncframework/handler/async/AsyncResult < Ljava/lang/String; >;Copy the code

Because the compiled descriptor for the generic

is ljava.lang.object; .

Such as AsyncResult generic class. (Select part)

public class AsyncResult<T> implements Future<T> {

    private T result;
    
    @Override
    public T get(a) throws InterruptedException, ExecutionException {
        returnresult; }}Copy the code

AsyncResult Bytecode information compiled by a generic class. (Select part)

public class com.wujiuye.asyncframework.handler.async.AsyncResult<T> implements java.util.concurrent.Future<T> { private  T result; descriptor: Ljava/lang/Object; public T get() throws java.lang.InterruptedException, java.util.concurrent.ExecutionException; descriptor: ()Ljava/lang/Object; Code: 0: aload_0 1: getfield #2 // Field result:Ljava/lang/Object; 4: areturnCopy the code

The descriptor of type T is Ljava/lang/Object; And in the get method, the type descriptor specified by the getField directive is also Ljava/lang/Object; .

The Callable interface is also generic, and the descriptor of the compiled call method is () ljava.lang.object; .

@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result  */ V call() throws Exception; }Copy the code

Therefore, if the Callable interface is implemented through bytecode, the call method should not be set to a method signature. Setting a method signature means changing the method descriptor as well, which will cause the call method in the thread pool to throw an abstract method call error. The reason is that according to the call method descriptor of the Callable interface, the corresponding Call method is not found in its implementation class.