AbortPolicyWithReport

Dubbo implements the rejection policy AbortPolicyWithReport, thread pool EagerThreadPoolExecutor, thread factory NameThreadFactory, TaskQueue, and thread service ExecuteService-Threa DlessExecutor, you need to learn these custom usages.


AbortPolicyWithReport adds WithReport functionality to the previously supported AbortPolicy (the default rejection policy is AbortPolicy, which is to throw exceptions). Add the JDK’s four rejection policies:

When the task cache queue of the thread pool is full and the number of threads in the thread pool reaches maximumPoolSize, the task rejection policy is adopted if additional tasks arrive. There are usually four strategies: ThreadPoolExecutor. AbortPolicy: discard task and throw RejectedExecutionException anomalies. ThreadPoolExecutor. DiscardPolicy: discard task, but does not throw an exception. ThreadPoolExecutor. DiscardOldestPolicy: discard queue in front of the task, and then resubmit the rejected task ThreadPoolExecutor. CallerRunsPolicy: by the calling thread (submit task thread) with this taskCopy the code

Class, attribute, and constructor are as follows. Inheritance ThreadPoolExecutor AbortPolicy class, rewrite the rejectedExecution method. The following SPI ThreadPool uses this rejection policy and uses its constructor, passing the same name as the one passed to NameThreadFactory. Properties from lastPrintTime to Guard are used by the later dumpJStack methods. LastPrintTime and TEN_MINUTES_MILLS control the dump interval. OS_WIN_PREFIX to DEFAULT_DATETIME_FORMAT is used to set different date formats for different systems. Guard semaphores control concurrency. Multiple threads concurrent with dumpJStack allow only one thread to dump at the current time.

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

    private final String threadName;

    private final URL url;

    private static volatile long lastPrintTime = 0;

    private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;

    private static final String OS_WIN_PREFIX = "win";

    private static final String OS_NAME_KEY = "os.name";

    private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss";

    private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss";

    private static Semaphore guard = new Semaphore(1);

    // The discovery is mainly passing in a custom reject policy at new ThreadPoolExecutor
    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }
Copy the code

RejectedExecution: The rejectedExecution method overridden from AbortPolicy is invoked when a thread pool is rejected. Go straight to the notes.

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // the r argument is not used
    String msg = String.format("Thread pool is EXHAUSTED!" +  // Thread pool is EXHAUSTED! Thread pool EXHAUSTED and holding too many tasks trigger reject action
            " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
            + "%d)," +
            " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
        threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
        e.getLargestPoolSize(),
        e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
        url.getProtocol(), url.getIp(), url.getPort());
    / / 1. The call log AbortPolicyWithReportTest, for example: Thread pool is EXHAUSTED! Thread Name: Test, Pool Size: 0 (active: 0, core: 1, max: 1, largest: 0), Task: 0 (completed: 0), Executor status: (isShutdown: false, isTerminated: false, isTerminating: false), in dubbo: / / 10.20.130.230:20880!
    logger.warn(msg);

    // 2. Dump the stack information of the current system thread
    dumpJStack();

    // 3. Handle the event, the usage of the event monitoring model: after the event occurs, let the corresponding listener process, enter
    dispatchThreadPoolExhaustedEvent(msg); // dispatch: dispatch; Deal with it quickly,

    // 4. Finally, throw the exception, which is the same as AbortPolicy's rejectedExecution method, but with some other operations
    throw new RejectedExecutionException(msg);
}
Copy the code

Notify Threadpoolhaustedevent of events to relevant listeners.

public void dispatchThreadPoolExhaustedEvent(String msg) {
    / / dispatch
    EventDispatcher.getDefaultExtension().dispatch(new ThreadPoolExhaustedEvent(this, msg));
}
Copy the code

DumpJStack reads the comments directly, and the dump process is performed by the new thread pool. The jvmutil.jstack (jStackStream) method is called, which is basically the process of writing files to JDK APIS, but I won’t go into detail here. By default, files are generated in the user directory. Of course, you can also set the dump.directory parameter value in the URL. The file name is dubbo_jstack.log.2021-03-15_11/23/18

private void dumpJStack(a) {
    long now = System.currentTimeMillis();

    // dump every 10 minutes
    if (now - lastPrintTime < TEN_MINUTES_MILLS) {
        return;
    }

    // The semaphore limits concurrency to 1
    if(! guard.tryAcquire()) {return;
    }

    ExecutorService pool = Executors.newSingleThreadExecutor();
    pool.execute(() -> {
        String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));

        SimpleDateFormat sdf;

        String os = System.getProperty(OS_NAME_KEY).toLowerCase();

        // window system don't support ":" in file name
        if (os.contains(OS_WIN_PREFIX)) {
            sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);
        } else {
            sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);
        }

        String dateStr = sdf.format(new Date());
        // try-with-resources
        try (FileOutputStream jStackStream = new FileOutputStream(
            new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) { // new File The second argument is child and the specific File name
            / / in
            JVMUtil.jstack(jStackStream);
        } catch (Throwable t) {
            logger.error("dump jStack error", t);
        } finally {
            // Release semaphore
            guard.release();
        }
        lastPrintTime = System.currentTimeMillis();
    });

    // must shutdown thread pool ,if not will lead to OOM
    pool.shutdown();

}
Copy the code