In this paper, starting from purpose of moored floating column: segmentfault.com/blog/camile

This article is participating in “Java Theme Month – Java Development in Action”, see the activity link for details

version The date of note
1.0 2017.12.10 The article first
1.1 2020.9.6 Propose some referential solutions according to the defects
1.2 2021.7.13 Add diagram
1.3 2021.8.10 Update improvement plan

preface

In ZStack, the basic unit of execution is not just a function, but also a Task. It essentially implements Java’s Callable interface. Consuming these tasks in parallel with reasonably sized thread pool scheduling allows ZStack, an Iaas software, to run smoothly in large data centers.

For those of you who don’t know much about thread pools, you can read my blog: Java Multithreading Notes (3) : Thread pools

Demo code

Here, the most common method of ThreadFacade in ZStack is used as an example.

syncSubmit

Submit a synchronization task and the thread will wait for the result to complete before proceeding to the next task.

Here’s the ZStack API MediatorImpL, which has a piece of logic for API message scheduling.

    @Override
    public void handleMessage(final Message msg) {
        thdf.syncSubmit(new SyncTask<Object>() {
            @Override
            public String getSyncSignature(a) {
                return "api.worker";
            }

            @Override
            public int getSyncLevel(a) {
                return apiWorkerNum;
            }

            @Override
            public String getName(a) {
                return "api.worker";
            }

            @MessageSafe
            public void handleMessage(Message msg) {
                if (msg instanceof APIIsReadyToGoMsg) {
                    handle((APIIsReadyToGoMsg) msg);
                } else if (msg instanceof APIGetVersionMsg) {
                    handle((APIGetVersionMsg) msg);
                } else if (msg instanceof APIGetCurrentTimeMsg) {
                    handle((APIGetCurrentTimeMsg) msg);
                } else if (msg instanceof APIMessage) {
                    dispatchMessage((APIMessage) msg);
                } else {
                    logger.debug("Not an APIMessage.Message ID is "+ msg.getId()); }}@Override
            public Object call(a) throws Exception {
                handleMessage(msg);
                return null; }}); }Copy the code

Each API message is consumed by a thread with a maximum concurrency of 5 (apiWorkerNum=5). Each thread waits for a response from the API message and then gives it to the user.

chainSubmit

Submit an asynchronous task, where the task will execute the next task in the queue without waiting for the result.

Refer to the VmInstanceBase code for starting, restarting, and pausing virtual machines:

  // Suspend the virtual machine
    protected void handle(final APIStopVmInstanceMsg msg) {
        thdf.chainSubmit(new ChainTask(msg) {
            @Override
            public String getName(a) {
                return String.format("stop-vm-%s", self.getUuid());
            }

            @Override
            public String getSyncSignature(a) {
                return syncThreadName;
            }

            @Override
            public void run(SyncTaskChain chain) { stopVm(msg, chain); }}); }// Restart the VM
    protected void handle(final APIRebootVmInstanceMsg msg) {
        thdf.chainSubmit(new ChainTask(msg) {
            @Override
            public String getName(a) {
                return String.format("reboot-vm-%s", self.getUuid());
            }

            @Override
            public String getSyncSignature(a) {
                return syncThreadName;
            }

            @Override
            public void run(SyncTaskChain chain) { rebootVm(msg, chain); }}); }// Start the VM
    protected void handle(final APIStartVmInstanceMsg msg) {
        thdf.chainSubmit(new ChainTask(msg) {
            @Override
            public String getName(a) {
                return String.format("start-vm-%s", self.getUuid());
            }

            @Override
            public String getSyncSignature(a) {
                return syncThreadName;
            }

            @Override
            public void run(SyncTaskChain chain) { startVm(msg, chain); }}); }Copy the code

General features

GetSyncSignature specifies the key of its queue, which is essentially a Map. Based on the same k, the tasks are placed into the map in order as V. From the perspective of the service logic, virtual machine state confusion can be effectively avoided.

ChainTask has a default concurrency of 1, which means it is synchronous. We will see this later in the source code parsing.

Its implementation

Take a look at method signatures from the interface ThreadFacade:

public interface ThreadFacade extends Component {
    <T> Future<T> submit(Task<T> task);// Submit a task
    
    <T> Future<T> syncSubmit(SyncTask<T> task); // Submit a task with a return value
    
    Future<Void> chainSubmit(ChainTask task); // Submit a task with no return value
    
    Future<Void> submitPeriodicTask(PeriodicTask task, long delay); // Submit a periodic task that will be executed after a certain time
    
    Future<Void> submitPeriodicTask(PeriodicTask task); // Submit a periodic task
    
    Future<Void> submitCancelablePeriodicTask(CancelablePeriodicTask task); // Submit a periodic task that can be cancelled
    
    Future<Void> submitCancelablePeriodicTask(CancelablePeriodicTask task, long delay); // Submit a periodic task that can be cancelled and will be executed after a certain time
    
    void registerHook(ThreadAroundHook hook);  // Register the hook
    
    void unregisterHook(ThreadAroundHook hook); // Cancel the hook
    
    ThreadFacadeImpl.TimeoutTaskReceipt submitTimeoutTask(Runnable task, TimeUnit unit, long delay); // Submit a task that expires after a certain amount of time

    void submitTimerTask(TimerTask task, TimeUnit unit, long delay); // Submit a timer task
}
Copy the code

And several member variables in the method logic implementation class DispatchQueueImpl.

    private static final CLogger logger = Utils.getLogger(DispatchQueueImpl.class);

    @Autowired
    ThreadFacade _threadFacade;

    private final HashMap<String, SyncTaskQueueWrapper> syncTasks = new HashMap<String, SyncTaskQueueWrapper>();
    private final HashMap<String, ChainTaskQueueWrapper> chainTasks = new HashMap<String, ChainTaskQueueWrapper>();
    private static final CLogger _logger = CLoggerImpl.getLogger(DispatchQueueImpl.class);

    public static final String DUMP_TASK_DEBUG_SINGAL = "DumpTaskQueue";
Copy the code

The key is syncTasks(synchronous queue) and chainTasks(asynchronous queue), which are used to store two types of task queues.

Therefore, when we submit chainTask, we need to remember to call the next method displayed, so that the next task will not be scheduled.

Next, let’s look at the code for the most common methods.

ChainSubmit method

Use ThreadFacadeImpl as the entry point

    @Override
    public Future<Void> chainSubmit(ChainTask task) {
        return dpq.chainSubmit(task);
    }
Copy the code

Logic in DispatchQueue

    // Public method, one of the entries
    @Override
    public Future<Void> chainSubmit(ChainTask task) {
        return doChainSyncSubmit(task);
    }
Copy the code
    // Internal logic
    private <T> Future<T> doChainSyncSubmit(final ChainTask task) {
        asserttask.getSyncSignature() ! =null : "How can you submit a chain task without sync signature ???";
        DebugUtils.Assert(task.getSyncLevel() >= 1, String.format("getSyncLevel() must return 1 at least "));

        synchronized (chainTasks) {
            final String signature = task.getSyncSignature();
            ChainTaskQueueWrapper wrapper = chainTasks.get(signature);
            if (wrapper == null) {
                wrapper = new ChainTaskQueueWrapper();
                chainTasks.put(signature, wrapper);
            }

            ChainFuture cf = new ChainFuture(task);
            wrapper.addTask(cf);
            wrapper.startThreadIfNeeded();
            returncf; }}Copy the code

The logic goes something like this:

  • Asserts that syncSignature is not empty and that parallelism must be greater than or equal to 1. Because the ones will be queued, and one thread will do these tasks. Above 1 specifies how many threads can be used to complete the same tasksignatureThe task.
  • lockHashMap<String, ChainTaskQueueWrapper> chainTasks, try to take outThe same signatureIn the queue. If not, create a new correlationsignatureAnd initialize the number of threads in this queue and itssignature. In any case, put the task on a queue.
  • And then there’sstartThreadIfNeeded. IfNeeded means that the number of threads in the queue is still free. Then submit a task to the thread pool that fetches a Feture from the wait queue and, if the wait queue is empty, removes the Map of the wait queue.
    private class ChainTaskQueueWrapper {
        LinkedList pendingQueue = new LinkedList();
        final LinkedList runningQueue = new LinkedList();
        AtomicInteger counter = new AtomicInteger(0);
        int maxThreadNum = -1;
        String syncSignature;

        void addTask(ChainFuture task) {
            pendingQueue.offer(task);

            if (maxThreadNum == -1) {
                maxThreadNum = task.getSyncLevel();
            }
            if (syncSignature == null) { syncSignature = task.getSyncSignature(); }}void startThreadIfNeeded(a) {
            // If the number of running threads is greater than or equal to the limit, do not start
            if (counter.get() >= maxThreadNum) {
                return;
            }

            counter.incrementAndGet();
            _threadFacade.submit(new Task<Void>() {
                @Override
                public String getName(a) {
                    return "sync-chain-thread";
                }

                // start a new thread every time to avoid stack overflow
                @AsyncThread
                private void runQueue(a) {
                    ChainFuture cf;
                    synchronized (chainTasks) {
                        // remove from pending queue and add to running queue later
                        cf = (ChainFuture) pendingQueue.poll();

                        if (cf == null) {
                            if (counter.decrementAndGet() == 0) {
                                // If there is only one thread, remove the associated signature queue to avoid memory usage
                                chainTasks.remove(syncSignature);
                            }
                            // If it is empty, there is no task
                            return; }}synchronized (runningQueue) {
                        // add to running queue
                        runningQueue.offer(cf);
                    }
                    // Move the task out of the run queue after completion
                    cf.run(new SyncTaskChain() {
                        @Override
                        public void next(a) {
                            synchronized(runningQueue) { runningQueue.remove(cf); } runQueue(); }}); }// This method will be called by the thread pool as an entry point
                @Override
                public Void call(a) throws Exception {
                    runQueue();
                    return null; }}); }}Copy the code

SyncSubmit method

The internal logic of syncSubmit is very similar to the chainSubmit we analyzed earlier, except that it is placed in a different queue.

Again, ThreadFacadeImpl is used as the entry point

    @Override
    public <T> Future<T> syncSubmit(SyncTask<T> task) {
        return dpq.syncSubmit(task);
    }
Copy the code

Then there’s the implementation in DispatchQueue

    @Override
    public <T> Future<T> syncSubmit(SyncTask<T> task) {
        if (task.getSyncLevel() <= 0) {
            return _threadFacade.submit(task);
        } else {
            returndoSyncSubmit(task); }}Copy the code

Internal logic – Private methods

    private <T> Future<T> doSyncSubmit(final SyncTask<T> syncTask) {
        assertsyncTask.getSyncSignature() ! =null : "How can you submit a sync task without sync signature ???";

        SyncTaskFuture f;
        synchronized (syncTasks) {
            SyncTaskQueueWrapper wrapper = syncTasks.get(syncTask.getSyncSignature());
            if (wrapper == null) {
                wrapper = new SyncTaskQueueWrapper();
                // Put into the syncTasks queue.
                syncTasks.put(syncTask.getSyncSignature(), wrapper);
            }
            f = new SyncTaskFuture(syncTask);
            wrapper.addTask(f);
            wrapper.startThreadIfNeeded();
        }

        return f;
    }
Copy the code

submitPeriodicTask

Submitting a scheduled task is essentially implemented through the scheduleAtFixedRate of the thread pool. This method is used to schedule tasks periodically. The frequency of task scheduling is fixed. It starts from the execution time of the previous task and schedules the next task after a period. If the execution time of a task is longer than the scheduled time, the task will be invoked immediately after the completion of the previous task.

Calling this method puts the task into a scheduled task queue. When the task fails, the Futrue is cancelled and the queue is removed.

    public Future<Void> submitPeriodicTask(final PeriodicTask task, long delay) {
        asserttask.getInterval() ! =0;
        asserttask.getTimeUnit() ! =null;

        ScheduledFuture<Void> ret = (ScheduledFuture<Void>) _pool.scheduleAtFixedRate(new Runnable() {
            public void run(a) {
                try {
                    task.run();
                } catch (Throwable e) {
                    _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e);
                    finalMap<PeriodicTask, ScheduledFuture<? >> periodicTasks = getPeriodicTasks();finalScheduledFuture<? > ft = periodicTasks.get(task);if(ft ! =null) {
                        ft.cancel(true);
                        periodicTasks.remove(task);
                    } else {
                        _logger.warn("Not found feature for task " + task.getName()
                                + ", the exception happened too soon, will try to cancel the task next time the exception happens");
                    }
                }
            }
        }, delay, task.getInterval(), task.getTimeUnit());
        _periodicTasks.put(task, ret);
        return ret;
    }
Copy the code

submitCancelablePeriodicTask

And submitCancelablePeriodicTask is when executed will monitor whether ScheduledFuture are required to cancel, if there are request to cancel.

   @Override
    public Future<Void> submitCancelablePeriodicTask(final CancelablePeriodicTask task, long delay) {
        ScheduledFuture<Void> ret = (ScheduledFuture<Void>) _pool.scheduleAtFixedRate(new Runnable() {
            private void cancelTask(a) { ScheduledFuture<? > ft = cancelablePeriodicTasks.get(task);if(ft ! =null) {
                    ft.cancel(true);
                    cancelablePeriodicTasks.remove(task);
                } else {
                    _logger.warn("cannot find feature for task " + task.getName()
                            + ", the exception happened too soon, will try to cancel the task next time the exception happens"); }}public void run(a) {
                try {
                    boolean cancel = task.run();
                    if(cancel) { cancelTask(); }}catch (Throwable e) {
                    _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e);
                    cancelTask();
                }
            }
        }, delay, task.getInterval(), task.getTimeUnit());
        cancelablePeriodicTasks.put(task, ret);
        return ret;
    }
Copy the code

Initialization operation

Unlike the usual ZStack Component, it implements the Component interface. But the logic in start is not comprehensive, and the initialization logic is based on the life cycle of the Spring bean. See ThreadFacade.


      
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:zstack="http://zstack.org/schema/zstack"
	xsi:schemaLocation="Http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://zstack.org/schema/zstack http://zstack.org/schema/zstack/plugin.xsd"
	default-init-method="init" default-destroy-method="destroy">

	<bean id="ThreadFacade" class="org.zstack.core.thread.ThreadFacadeImpl">
		<property name="totalThreadNum" value="500" />
		<! -- don't declare Component extension, it's specially handled -->
	</bean>

	<bean id="ThreadAspectj" class="org.zstack.core.aspect.ThreadAspect" factory-method="aspectOf" />

</beans>
Copy the code

Let’s go back to the init and destory operations of ThreadFacadeImpl.

/ / init
    public void init(a) {
          // Read the maximum number of threads in the thread pool according to the global configuration
        totalThreadNum = ThreadGlobalProperty.MAX_THREAD_NUM;
        if (totalThreadNum < 10) {
            _logger.warn(String.format("ThreadFacade.maxThreadNum is configured to %s, which is too small for running zstack. Change it to 10", ThreadGlobalProperty.MAX_THREAD_NUM));
            totalThreadNum = 10;
        }
         // Build a thread pool that supports delayed tasks
        _pool = new ScheduledThreadPoolExecutorExt(totalThreadNum, this.this);
        _logger.debug(String.format("create ThreadFacade with max thread number:%s", totalThreadNum));
        // Build a DispatchQueue
        dpq = new DispatchQueueImpl();

        jmxf.registerBean("ThreadFacade".this);
    }
Copy the code
//destory
   public void destroy(a) {
        _pool.shutdownNow();
    }
Copy the code

As you may wonder, this kind of shutdown is about violence (the thread executing the task will all be interrupted). We mentioned earlier that it implements the Component interface. This interface has start and stop methods, respectively, to make it easy to register hooks in the ZStack for a component’s lifecycle.

//stop method @override public Boolean stop() {_pool.shutdown(); timerPool.stop(); return true; }Copy the code

Thread factory

ThreadFacadeImpl also implements a ThreadFactory, which allows the thread to do something at creation time.

    @Override
    public Thread newThread(Runnable arg0) {
        return new Thread(arg0, "zs-thread-" + String.valueOf(seqNum.getAndIncrement()));
    }
Copy the code

You can see here that ZStack assigns a name to each new thread.

The thread pool

ZStack extends the thread pool in the JDK to include hook functions before and after a task is executed, as well as open registration hooks.

package org.zstack.core.thread;

import org.apache.logging.log4j.ThreadContext;
import org.zstack.utils.logging.CLogger;
import org.zstack.utils.logging.CLoggerImpl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;


public class ScheduledThreadPoolExecutorExt extends ScheduledThreadPoolExecutor {
    private static final CLogger _logger =CLoggerImpl.getLogger(ScheduledThreadPoolExecutorExt.class);
    
    List<ThreadAroundHook> _hooks = new ArrayList<ThreadAroundHook>(8);

    public ScheduledThreadPoolExecutorExt(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, threadFactory, handler);
        this.setMaximumPoolSize(corePoolSize);
    }
    
    public void registerHook(ThreadAroundHook hook) {
        synchronized(_hooks) { _hooks.add(hook); }}public void unregisterHook(ThreadAroundHook hook) {
        synchronized(_hooks) { _hooks.remove(hook); }}@Override
    protected void beforeExecute(Thread t, Runnable r) {
        ThreadContext.clearMap();
        ThreadContext.clearStack();

        ThreadAroundHook debugHook = null;
        List<ThreadAroundHook> tmpHooks;       
        synchronized (_hooks) {
            tmpHooks = new ArrayList<ThreadAroundHook>(_hooks);
        }
        
        for (ThreadAroundHook hook : tmpHooks) {
            debugHook = hook;
            try {
                hook.beforeExecute(t, r);
            } catch (Exception e) {
                _logger.warn("Unhandle exception happend during executing ThreadAroundHook: "+ debugHook.getClass().getCanonicalName(), e); }}}@Override
    protected void afterExecute(Runnable r, Throwable t) {
        ThreadContext.clearMap();
        ThreadContext.clearStack();

        ThreadAroundHook debugHook = null;
        List<ThreadAroundHook> tmpHooks;
        synchronized (_hooks) {
            tmpHooks = new ArrayList<ThreadAroundHook>(_hooks);
        }
        
        for (ThreadAroundHook hook : tmpHooks) {
            debugHook = hook;
            try {
                hook.afterExecute(r, t);
            } catch (Exception e) {
                _logger.warn("Unhandle exception happend during executing ThreadAroundHook: "+ debugHook.getClass().getCanonicalName(), e); }}}}Copy the code

In addition, ScheduledThreadPoolExecutorExt is inherited from ScheduledThreadPoolExecutor. Is essentially a task scheduling thread pool, using a work queue is also a delay work queue.

summary

This article examines ZStack’s production-tested core component, the thread pool. Parallel programming is made less complicated by thread pooling.

Of course, there are some areas that could be improved:

  • Synchronized can be addressed by using concurrent containers. This improves throughput and saves overhead due to competing locks.
  • ** In the case of a large number of committed tasks, the Performance of HashMap suffers due to capacity expansion. ** Consider using fixed-size maps and hash keys into fixed entries to ensure that data structures do not continue to grow.
  • Queues are unbounded. When a large number of task requests are blocked, this can place a significant burden on memory.
  • There is no timeout logic for task queues. Most of the calls in ZStack are made by MQ, and each MSG has a timeout. However, there is no timeout judgment for each task, which means that when the execution time of a task is too long, subsequent tasks may enter the timeout state, but do not move out of the queue, which is a potential disaster to cooperate with the unbounded queue mentioned before. To solve this problem, see the ZookeeperSessionBucketOr KafkaTimingWheelTo solve this problem.