Dubbo implementation principle and source code analysis — Fine collection Netty implementation principle and source code analysis — boutique collection
“Spring Implementation principles and source code analysis — Boutique Collection” MyBatis implementation principle and source code analysis — boutique collection
Spring MVC Implementation Principle and source code Analysis — A collection of fine works Database Entity Design Collection
Spring Boot Implementation principle and source code analysis — Boutique Collection Java Interview Questions + Java Learning Guide

Abstract: the original source www.iocoder.cn/Eureka/batc… “Taro source” welcome to reprint, keep the summary, thank you!

This article is mainly based on Eureka version 1.8.X

  • 1. An overview of the
  • 2. Overall process
  • 3. Task processor
  • 4. Create a task distributor
    • 4.1 Batch Task Execution distributor
    • 4.2 Single-task execution distributor
  • 5. Create a task receiver executor
  • 6. Create a task executor
    • 6.1 Creating a Batch Task Executor
    • 6.2 Creating a Single-task Actuator
    • 6.3 Worker thread Abstract classes
  • 7. Network communication plastic
  • 8. Task receiving actuator [Processing task]
  • 9. Task receiving thread [Scheduling task]
  • 10. Task Executor [Task Execution]
    • 10.1 Batch task worker threads
    • 10.2 Single-task worker thread
  • 666. The eggs

🙂🙂🙂 follow wechat public number:

  1. RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
  2. RocketMQ/MyCAT/Sharding-JDBC 中文 解 决 source GitHub address
  3. Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
  4. New source code parsing articles are notified in real time. It’s updated about once a week.
  5. Serious source communication wechat group.

1. An overview of the

This article focuses on task batch processing. Eureka-server cluster registers instances through task batch synchronization of application instances, so this paper also lays a foundation for the sharing of Eureka-Server cluster synchronization.

This paper involves class at com.net flix. Eureka. Util. Batcher package, involves the main body of the class class diagram (open a larger view) as follows:

  • The purple part — the task distributor
  • The blue part — the task receiver
  • The red part — the task executor
  • Green — Task handlers
  • Yellow — Quest Holder (Quest)

Recommended Spring Cloud books:

  • Please support the legal version. Download piracy, is equal to the initiative to write low-level bugs.
  • DD — Spring Cloud Micro Services
  • Zhou Li — “Spring Cloud and Docker Micro-service Architecture Combat”
  • Buy two books together, jingdong free delivery.

2. Overall process

The overall process of task execution is as follows (open the larger picture) :

  • Thin arrow – The actions that the task goes through
  • Bold arrow – direction of the flow of the task queue
  • Instead of submitting tasks for immediate synchronous or asynchronous execution, the execution of a task splits three layers of queues:

    • The first layer is the accept queue and the reprocessQueue.

      • Blue line: The dispatcher submits the task execution request to the receiving queue, but the task is not actually executed.
      • Yellow line: The worker thread of the executor fails to process the task and submits the failed task that meets the criteria (see “3. Task Handler”) to the re-execution queue.
      • Layer 2, processingOrder

        • Pink line: The receiving thread (Runner) reexecutes the queue, and the receiving queue submits to the queue to be executed.
      • Layer 3, workQueue

        • Pink line: The receiving thread (Runner) puts the tasks to be executed in the queue according to the parameters (maxBatchingSize) merge tasks intoThe batch task, schedule (commit) to the work queue.
        • Yellow line: the worker thread pool of the executor. A worker thread can pull a batch task for execution.
  • Benefits of three-tier queues:

    • Receive queues to avoid blocking waits for processing tasks.
    • The receiving thread (Runner) merges tasks, combining tasks with the same task number (yes, tasks are numbered) and executing them only once.
    • Eureka-server provides interfaces for batch operation of multiple application instances for cluster synchronization. A batch task can be completed by scheduling interfaces at a time, avoiding the overhead of multiple invocation. Of course, this can only be done if the tasks are merged, which leads to greater delays in registering and taking application instances offline between Eureka-Server clusters. After all, Eureka chose AP over CAP.

3. Task processor

Com.net flix. Eureka. Util. Batcher. TaskProcessor, task processor interface. The interface code is as follows:

public interface TaskProcessor<T> {

    /**
     * A processed task/task list ends up in one of the following states:
     * <ul>
     *     <li>{@code Success} processing finished successfully</li>
     *     <li>{@code TransientError} processing failed, but shall be retried later</li>
     *     <li>{@code PermanentError} processing failed, and is non recoverable</li>
     * </ul>
     */
    enum ProcessingResult {
        /**
         * 成功
         */
        Success,
        /**
         * 拥挤错误
         */
        Congestion,
        /**
         * 瞬时错误
         */
        TransientError,
        /**
         * 永久错误
         */
        PermanentError
    }

    /**
     * 处理单任务
     * In non-batched mode a single task is processed at a time.
     */
    ProcessingResult process(T task);

    /**
     * 处理批量任务
     *
     * For batched mode a collection of tasks is run at a time. The result is provided for the aggregated result,
     * and all tasks are handled in the same way according to what is returned (for example are rescheduled, if the
     * error is transient).
     */
    ProcessingResult process(List<T> tasks);
}
Copy the code
  • ProcessingResult, process the task result.
    • SuccessAnd success.
    • Congestion, crowding error,The task will be retried. For example, the request is restricted.
    • TransientError, instantaneous error,The task will be retried. For example, a network request times out.
    • PermanentErrorPermanent error,The task will be discarded. For example, a program exception occurs during execution.
  • #process(task)Method to handle single tasks.
  • #process(tasks)Method to handle batch tasks.

4. Create a task distributor

Com.net flix. Eureka. Util. Batcher. TaskDispatcher, task dispenser interface. The interface code is as follows:

public interface TaskDispatcher<ID, T> {

    void process(ID id, T task, long expiryTime);

    void shutdown();
}
Copy the code
  • #process(...)Method, submit the task number, task, and task expiration time to the task distributor for processing.

Com.net flix. Eureka. Util. Batcher. TaskDispatchers, task dispenser factory class that is used to create a task the dispenser. Internally, it provides two implementations of the task distributor:

  • Dispenser for batch task execution, used to synchronize eureka-Server cluster registration information.
  • A single-task dispenser used by eureka-Server to synchronize state to the Autoscaling Group (ASG) of Amazon AWS. Although this series will not parse AWS for the time being, from a utility class perspective, it will be shared in this article.

Com.net flix. Eureka. Cluster ReplicationTaskProcessor, realize TaskDispatcher, eureka – Server cluster task processor. If you are interested, you can click on the link to do your own research. We will analyze Eureka source code in detail in eureka-Server cluster synchronization.

4.1 Batch Task Execution distributor

Call TaskDispatchers# createBatchingTaskDispatcher (…). Method to create a batch task execution dispenser, the implementation code is as follows:

// taskDispatchers. Java 1: /** * 2: * Create a dispatcher for batch task execution 3: * 4: * @param ID Number of the task execution 5: * @param maxBufferSize Maximum number of queues to be executed 6: * @param workloadSize Maximum number of tasks in a batch task 7: * @param workerCount Number of working threads on the task executer 8: * @param maxBatchingDelay Maximum delay time for waiting batch tasks, in milliseconds 9: * @param congestionRetryDelayMs Request traffic limiting delay retry time, in milliseconds 10: * @param networkFailureRetryMs network failure delay retry duration, in milliseconds 11: * @param taskProcessor taskProcessor 12: * @param <ID> task ID generic 13: * @param <T> Task generic 14: * @return Dispenser for batch task execution 15: */ 16: public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, 17: int maxBufferSize, 18: int workloadSize, 19: int workerCount, 20: long maxBatchingDelay, 21: long congestionRetryDelayMs, 22: Long networkFailureRetryMs, 23: TaskProcessor<T> TaskProcessor) {24: // Create a task receiver. final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>( 26: id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs 27: ); 28: // Create a batch task executor 29: final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor); 31: return new TaskDispatcher<ID, T>() {32: @override 33: public void process(ID id, T task, long expiryTime) { 34: acceptorExecutor.process(id, task, expiryTime); 35: } 36: 37: @Override 38: public void shutdown() { 39: acceptorExecutor.shutdown(); 40: taskExecutor.shutdown(); 42:41:}}; 43:}Copy the code
  • Lines 1 through 23: Method parameters. Compare Doha, please be patient.
    • workloadSizeParameter, a single batch task contains the maximum number of tasks.
    • taskProcessorParameters,Custom task executor implementation.
  • Lines 24 through 27: Create the task receiver executor. Detailed explanation in “5. Create task Sink”.
  • Lines 28 through 29: Create batch task executor. For details, see 6.1 Creating A Batch Task Executor.
  • Lines 30 to 42: createbatchTask dispatcher.
    • Lines 32 to 35:#process()Method implementation, callAcceptorExecutor#process(...)Method, submit [task number, task, task expiration time] to the task distributor for processing.

4.2 Single-task execution distributor

Call TaskDispatchers# createNonBatchingTaskDispatcher (…). Method to create a single-task dispenser, the implementation code is as follows:

3: * 4: * @param ID Id of the task executor 5: * @param maxBufferSize Maximum number of queues to be executed 6: * @param workerCount Number of working threads on the task executor7: * @param maxBatchingDelay Maximum delay for waiting tasks in a batch, in milliseconds 8: * @param networkFailureRetryMs Network failure delay Retry duration (unit: ms 10) * @param taskProcessor taskProcessor 11: * @param <ID> task number generic 12: * @param <T> task generic 13: * @return dispatcher for single task execution 14: */ 15: public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id, 16: int maxBufferSize, 17: int workerCount, 18: long maxBatchingDelay, 19: long congestionRetryDelayMs, 20: long networkFailureRetryMs, 21: TaskProcessor<T> TaskProcessor) {22: // create a TaskProcessor. final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>( 24: id, maxBufferSize, /* workloadSize = 1 */1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs 25: ); 26: final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor); 27: return new TaskDispatcher<ID, T>() { 28: @Override 29: public void process(ID id, T task, long expiryTime) { 30: acceptorExecutor.process(id, task, expiryTime); 31: } 32: 33: @Override 34: public void shutdown() { 35: acceptorExecutor.shutdown(); 36: taskExecutor.shutdown(); 37:} : 38}; 39:}Copy the code
  • Lines 1 through 21: method parameters. Compare Doha, please be patient.
    • workloadSizeParameter, compared with#createBatchingTaskDispatcher(...)Lose this parameter.On line 24, you will notice that this parameter is passed to AcceptorExecutor using 1.
    • taskProcessorParameters,Custom task executor implementation.
  • Lines 21 to 25: Create the taskreceiveActuators. and#createBatchingTaskDispatcher(...)onlyworkloadSize = 1Parameters. in”5. Create a Task sink”Detailed analysis.
  • Lines 28 through 29: createsingleTask executor. and#createBatchingTaskDispatcher(...)It’s a big difference.6.2 Creating a Single-Task ExecutorDetailed analysis.
  • Lines 30 to 42: createsingleTask dispatcher. and#createBatchingTaskDispatcher(...)The same.

5. Create a task receiver executor

Com.net flix. Eureka. Util. Batcher. AcceptorExecutor, receives the task execution. Create the constructor code as follows:

1: class AcceptorExecutor<ID, T> { 2: 3: private static final Logger logger = LoggerFactory.getLogger(AcceptorExecutor.class); 4: 5: /** 6: * Maximum number of queues to be executed 7: * {@link #processingOrder} 8: */ 9: private final int maxBufferSize; 10: /** 11: * Maximum number of tasks ina single batch task 12: */ 13: private final int maxBatchingSize; 14: /** 15: * Maximum delay time for batch tasks, in milliseconds 16: */ 17: private Final Long maxBatchingDelay; 18: 19: /** 20: * Whether to disable 21: */ 22: private Final AtomicBoolean isShutdown = new AtomicBoolean(false); 23: /** 24: * Receive task queue 25: */ 26: private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>(); 27: /** 28: * Re-execute the task queue 29: */ 30: private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>(); 31: /** 32: * Accept task Thread 33: */ 34: private final Thread acceptorThread; 35: 36: /** 37: * Task mapping 38: */ 39: private final Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>(); 40: /** 42: */ 43: private final Deque<ID> processingOrder = new LinkedList<>(); 44: 45: /** 46: * Single-task work request Semaphore 47: */ 48: Private Final Semaphore singleItemWorkRequests = New Semaphore(0); 49: /** 50: * Single-task work queue 51: */ 52: private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>(); 53: 54: /** 55: * Batch final Semaphore batchWorkRequests 56: */ 57: Private final Semaphore batchWorkRequests = new Semaphore(0); 58: /** 59: * Batch task work queue 60: */ 61: private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>(); 62:63: /** 64: * Network traffic shaping 65: */ 66: private final TrafficShaper TrafficShaper; 67: 68: AcceptorExecutor(String id, 69: int maxBufferSize, 70: int maxBatchingSize, 71: long maxBatchingDelay, 72: long congestionRetryDelayMs, 73: long networkFailureRetryMs) { 74: this.maxBufferSize = maxBufferSize; 75: this.maxBatchingSize = maxBatchingSize; 76: this.maxBatchingDelay = maxBatchingDelay; 79: this. TrafficShaper = new trafficShaper (CongestionRetrayms, Network Faileretryms); 82: ThreadGroup ThreadGroup = new ThreadGroup("eurekaTaskExecutors"); 83: this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id); 84: this.acceptorThread.setDaemon(true); 85: this.acceptorThread.start(); // TODO (omitted code) Taro: monitoring related, temporarily ignoredCopy the code

6. Create a task executor

Com.net flix. Eureka. Util. Batcher. TaskExecutors, task executor. Internally, it provides two ways to create single-task and batch task executors. TaskExecutors are constructed as follows:

class TaskExecutors<ID, T> {

    private static final Logger logger = LoggerFactory.getLogger(TaskExecutors.class);

    /**
     * 是否关闭
     */
    private final AtomicBoolean isShutdown;
    /**
     * 工作线程池
     */
    private final List<Thread> workerThreads;

    TaskExecutors(WorkerRunnableFactory<ID, T> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) {
        this.isShutdown = isShutdown;
        this.workerThreads = new ArrayList<>();

        // 创建 工作线程池
        ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
        for (int i = 0; i < workerCount; i++) {
            WorkerRunnable<ID, T> runnable = workerRunnableFactory.create(i);
            Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName());
            workerThreads.add(workerThread);
            workerThread.setDaemon(true);
            workerThread.start();
        }
    }
    
    /**
     * 创建工作线程工厂
     *
     * @param <ID> 任务编号泛型
     * @param <T> 批量任务执行器
     */
    interface WorkerRunnableFactory<ID, T> {
        WorkerRunnable<ID, T> create(int idx);
    }
}
Copy the code
  • workerThreadsProperty, worker threadpool.The work task queue is pulled concurrently by the worker thread pool and executed concurrently.
  • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnableFactoryTo create a worker thread factoryinterface. The worker thread implementation of single-task and batch task executor is different, and class creation is implemented through a custom factory.

6.1 Creating a Batch Task Executor

Call TaskExecutors# batchExecutors (…). Method to create a batch task executor. The implementation code is as follows:

/** * create batch task executor ** @param name Task executor name * @param workerCount Number of task executor threads * @param processor Task processor * @param AcceptorExecutor * @param <ID> Task ID Generic * @param <T> Task Generic * @return Batch task implementer */ static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) { final AtomicBoolean isShutdown = new AtomicBoolean(); final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name); Return new TaskExecutors<>(new WorkerRunnableFactory<ID, T>() {Override public WorkerRunnable<ID, T> create(int idx) {return new BatchWorkerRunnable<>("TaskBatchingWorker-" + name + '-' + idx /* thread name */, isShutdown, metrics, processor, acceptorExecutor); } }, workerCount, isShutdown); }Copy the code
  • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.BatchWorkerRunnable, batch task worker thread.

6.2 Creating a Single-task Actuator

Call TaskExecutors# singleItemExecutors (…). Method to create a batch task executor. The implementation code is as follows:

/** * create a single task executor ** @param name Task executor name * @param workerCount Number of task executor working threads * @param Processor Task processor * @param acceptorExecutor * @param <T> Task * @return Single task */ static <ID, T> TaskExecutors<ID, * @param <T> Task * @return Single task */ static <ID, T> TaskExecutors<ID, T> singleItemExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) { final AtomicBoolean isShutdown = new AtomicBoolean(); final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name); Return new TaskExecutors<>(new WorkerRunnableFactory<ID, T>() {Override public WorkerRunnable<ID, T> create(int idx) {return new SingleTaskWorkerRunnable<>("TaskNonBatchingWorker-" + name + '-' + idx /* thread name */, isShutdown, metrics, processor, acceptorExecutor); } }, workerCount, isShutdown); }Copy the code
  • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.SingleTaskWorkerRunnable, single-task worker thread.

6.3 Worker thread Abstract classes

Com.net flix. Eureka. Util. Batcher. TaskExecutors. WorkerRunnable, work task thread abstract classes. BatchWorkerRunnable and SingleTaskWorkerRunnable both implement this class, the difference being the custom implementation of #run(). The WorkerRunnable implementation code is as follows:

Abstract static class WorkerRunnable<ID, T> implements Runnable {/** * final String workerName; /** * final AtomicBoolean isShutdown; final TaskExecutorMetrics metrics; /** */ final TaskProcessor<T> processor; /** * final AcceptorExecutor<ID, T> taskDispatcher; / /... Omit the constructor and getting method. }Copy the code

7. Network communication plastic

Com.net flix. Eureka. Util. Batcher. TrafficShaper, network communication dresser. The deferred AcceptorRunner submits the task to the work queue to avoid the task being executed too soon. TrafficShaper is implemented as follows:

class TrafficShaper { /** * Upper bound on delay provided by configuration. */ private static final long MAX_DELAY = 30 * 1000; */ Private final Long congestionRetryDelayMs; /** * Network failure delay Retry duration, in milliseconds */ private final Long networkFailureRetryMs; /** * Private Volatile Long lastCongestionError; /** * lastNetworkFailure timestamp, in milliseconds */ private volatile long lastNetworkFailure; TrafficShaper(long congestionRetryDelayMs, long networkFailureRetryMs) { this.congestionRetryDelayMs = Math.min(MAX_DELAY, congestionRetryDelayMs); this.networkFailureRetryMs = Math.min(MAX_DELAY, networkFailureRetryMs); } void registerFailure(ProcessingResult processingResult) { if (processingResult == ProcessingResult.Congestion) { lastCongestionError = System.currentTimeMillis(); } else if (processingResult == ProcessingResult.TransientError) { lastNetworkFailure = System.currentTimeMillis(); }} /** * Calculates the commit delay, in: If (lastCongestionError == -1 &&lastNetworkFailure == -1) {// If (lastCongestionError == -1 &&lastNetworkFailure == -1) { return 0; } long now = System.currentTimeMillis(); If (lastCongestionError! = -1) { long congestionDelay = now - lastCongestionError; If (congestionDelay >= 0 && congestionDelay < congestionRetryDelayMs) {return congestionRetryDelayMs - congestionDelay; } lastCongestionError = -1; // calculate the delay caused by lastNetworkFailure if (lastNetworkFailure! = -1) { long failureDelay = now - lastNetworkFailure; If (failureDelay >= 0 && failureDelay < networkFailureRetryMs) {// Range return NetworkFailureretryms-failuredelay; // Add delay} lastNetworkFailure = -1; } // return 0 without delay; }}Copy the code

8. Task receiving actuator [Processing task]

Call AcceptorExecutor# process (…). Method to add a task to the receive task queue. The implementation code is as follows:

// AcceptorExecutor.java
void process(ID id, T task, long expiryTime) {
   acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
   acceptedTasks++;
}
Copy the code
  • Com.net flix. Eureka. Util. Batcher. TaskHolder, task holders, implementation code is as follows:

    Class TaskHolder<ID, T> {/** ** private final ID ID; /** * task */ private final T task; / / Private Final Long expiryTime; /** * private final Long submitTimestamp; }Copy the code

9. Task receiving thread [Scheduling task]

The background thread executes AcceptorRunner#run(…) Method, scheduling tasks. The implementation code is as follows:

1: @Override 2: public void run() { 3: long scheduleTime = 0; 4: while (! IsShutdown. Get ()) {5: drain {6: drainInputQueues(); 6: drainInputQueues(); 8: 9: // Number of tasks to be executed 10: int totalItems = processingOrder.size(); 13: long now = system.currentTimemillis (); 14: if (scheduleTime < now) { 15: scheduleTime = now + trafficShaper.transmissionDelay(); 16:} 17: 18: if (scheduleTime <= now) {20: // Scheduling 21: assignBatchWork(); 23: assignSingleItemWork(); 24:} 25:26: // 1) The task executor has no task request and is busy processing the previous task; Or 2) task delay scheduling. Sleep for 10 seconds to avoid wasting resources. 27: // If no worker is requesting data or there is a delay injected by the traffic shaper, 28: // sleep for some time to avoid tight loop. 29: if (totalItems == processingOrder.size()) { 30: Thread.sleep(10); 31: } 32: } catch (InterruptedException ex) { 33: // Ignore 34: } catch (Throwable e) { 35: // Safe-guard, so we never exit this loop in an uncontrolled way. 36: logger.warn("Discovery AcceptorThread error", e); 37:} 38:} 39:}Copy the code
  • Line 4: Executes the schedule in an infinite loop until it is closed.
  • Lines 6-7: call the #drainInputQueues() method to loop through the input queue (receive queue + re-execute queue) until the tasks to be executed. The implementation code is as follows:

    1: private void drainInputQueues() throws InterruptedException {2: do {3: // reexecuting the drainReprocessQueue(); 5: drainAcceptorQueue(); 6: drainAcceptorQueue(); 7: 8: // All queues are empty, wait 10 ms to see if there is a new task in the receiving queue 9: if (! isShutdown.get()) { 10: // If all queues are empty, block for a while on the acceptor queue 11: if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) { 12: TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS); 13: if (taskHolder ! = null) { 14: appendTaskHolder(taskHolder); 15: } 16: } 17: } 18: } while (! reprocessQueue.isEmpty() || ! acceptorQueue.isEmpty() || pendingTasks.isEmpty()); // Finish processing the input queue (receive queue + re-execute queue) 19:}Copy the code
    • Line 2 && line 18:cycleUntil theAt the same timeAll the following conditions are met:
      • Re-execute the queue (reprocessQueue) and receive queue (acceptorQueue) is empty
      • Task mapping (pendingTasks )Don’t empty
    • Lines 3 to 4: reprocessQueue. The implementation code is as follows:

      1: private void drainReprocessQueue() { 2: long now = System.currentTimeMillis(); 3: while (! reprocessQueue.isEmpty() && ! isFull()) { 4: TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast(); ID = taskholder.getid (); 6: if (taskholder.getexpirytime () <= now) {// expiration 7: expiredTasks++; } else if (pendingtasks.containskey (id)) {} 9: overriddenTasks++; 10: } else { 11: pendingTasks.put(id, taskHolder); 12: processingOrder.addFirst(id); 16: if (isFull()) {17: queueOverflows += reprocessQueue.size(); 17: queueOverflows += reprocessQueue.size(); 18: reprocessQueue.clear(); 19:20:}}Copy the code
      • Line 4: Priority is given to taking newer tasks from the end of the queue to keep updated tasks in the pending task mapping (pendingTasks).
      • Line 12: Add the task number to the queue to execute (processingOrder) head. The effect is shown below:
      • Lines 15 to 18: If the queue to execute (pendingTasks) full, empty the reexecution queue (processingOrder), abandon the earlier mission.
    • Lines 5 to 6: Complete the acceptorQueue.

      private void drainAcceptorQueue() { while (! Acceptorqueue.isempty ()) {// loop until the receiving queue isEmpty appendTaskHolder(acceptorqueue.poll ()); }} private void appendTaskHolder(TaskHolder<ID, T> TaskHolder) {// If the queue is full, remove the queue. Discard earlier task if (isFull()) {pendingtasks.remove (processingOrder.poll()); queueOverflows++; TaskHolder<ID, T> previousTask = pendingtasks.put (taskholder.getid (), TaskHolder); if (previousTask == null) { processingOrder.add(taskHolder.getId()); } else { overriddenTasks++; }}Copy the code
    • Line 8 to 17: When all queues are empty, block pulling the task from the acceptorQueue for 10 ms. If pulled to, add to the queue to be executed (processingOrder).

  • Lines 12 through 16: Calculate the minimum scheduleTime of a schedulable task.

    • whenscheduleTimeIf the time is smaller than the current time, the calculation is not recalculated.
    • whenscheduleTimeGreater than or equal to the current time, matchTrafficShaper#transmissionDelay(...)Recalculate.
  • Line 19: whenscheduleTimeLess than the current time, perform task scheduling.
  • Line 21: Call the #assignBatchWork() method to schedule batch tasks. The implementation code is as follows:

    1: void assignBatchWork () {2: if (hasEnoughTasksForNextBatch ()) {3: / / get the batch task work request semaphore 4: If (batchWorkRequests tryAcquire (1)) {5: / / get the batch task 6: long = System. CurrentTimeMillis (); 7: int len = Math.min(maxBatchingSize, processingOrder.size()); 8: List<TaskHolder<ID, T>> holders = new ArrayList<>(len); 9: while (holders.size() < len && ! processingOrder.isEmpty()) { 10: ID id = processingOrder.poll(); 11: TaskHolder<ID, T> holder = pendingTasks.remove(id); If (holder. Getytime () > now) {// expiration. 14: } else { 15: expiredTasks++; 16:17:}} 18: / / 19: the if (holders. IsEmpty ()) {/ / not scheduling to batch task, release the request semaphore 20: batchWorkRequests. Release (); } else {batchSizeMetric. Record (holders. Size (), timeunit.milliseconds); 23: batchWorkQueue.add(holders); 24:} 25:} 26:} 27:}Copy the code
    • Line 2: call # hasEnoughTasksForNextBatch () method, to determine whether there is enough task the next batch of task scheduling: 1) to perform a task (processingOrder) mapping is full; Or 2) the maximum waiting delay for batch task processing is reached. The implementation code is as follows:

      Private Boolean hasEnoughTasksForNextBatch () {/ / to execute the queue is empty the if (processingOrder. IsEmpty () {return false. } // The task mapping is full if (pendingtasks.size () >= maxBufferSize) {return true; } TaskHolder<ID, T> nextHolder = pendingtasks.get (processingOrder.peek()); long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp(); return delay >= maxBatchingDelay; }Copy the code
      • x
    • Lines 5 to 17: Obtain the holders of batch tasks. List

      > 😈

    • Line 4: Get batch task work request semaphore (batchWorkRequests). Batch task executor in task executor, issued each time it is executedbatchWorkRequestsEach semaphore is guaranteed to fetch a batch task.
    • Lines 19 to 20: If the batch task is not scheduled, the request semaphore is released, indicating that the request is not actually completed. Each semaphore needs to ensure that a batch task is obtained.
    • Lines 21 through 24: Add batch tasks to the batch task work queue.
    • Line 23: Call#assignSingleItemWork()Method, scheduling single task.
  • Line 23: Call #assignSingleItemWork() to schedule a single task, similar to the #assignBatchWork() method. The implementation code is as follows:

    void assignSingleItemWork() { if (! ProcessingOrder. IsEmpty ()) {/ / to execute as a queue is empty / / not for a single task work request semaphore the if (singleItemWorkRequests. TryAcquire (1)) {/ / to get single task cycle 】 【 long  now = System.currentTimeMillis(); while (! processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingtasks. remove(ID); if (holder.getExpiryTime() > now) { singleItemWorkQueue.add(holder); return; } expiredTasks++; } / / for less than a single task, release request semaphore singleItemWorkRequests. Release (); }}}Copy the code
    • x
  • Line 26 to 31: When the number of totalItems before scheduling a task is equal to the number of tasks in the current processingOrder, it means: 1) The task executer has no task request and is busy processing the previous task; Or 2) task delay scheduling. Sleep for 10 seconds to avoid wasting resources.

10. Task Executor [Task Execution]

10.1 Batch task worker threads

Batch task work background thread (BatchWorkerRunnable) executes #run(…) Method, scheduling tasks. The implementation code is as follows:

// 1: @Override 2: public void run() { 3: try { 4: while (! Isshutdown.get ()) {5: // obtain a batch task 6: List<TaskHolder<ID, T>> holders = getWork(); 7:8: / / TODO taro: monitoring, disregard 9: metrics. RegisterExpiryTimes (holders); 12: List<T> Tasks = getTasksOf(holders); 14: ProcessingResult result = processor.process(tasks); 15: switch (result) { 16: case Success: 17: break; 18: case Congestion: 19: case TransientError: 20: taskDispatcher.reprocess(holders, result); // Commit reprocessing 21: break; 22: case PermanentError: 23: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); 24:} 25, 26: / / TODO taro: monitoring, disregard 27: metrics. RegisterTaskResult (result, tasks. The size ()); 28: } 29: } catch (InterruptedException e) { 30: // Ignore 31: } catch (Throwable e) { 32: // Safe-guard, so we never exit this loop in an uncontrolled way. 33: logger.warn("Discovery WorkerThread error", e); 34, 35:}}Copy the code
  • Line 4: Executes the schedule in an infinite loop until it is closed.
  • Line 6: Call the getWork() method to get a batch task until it succeeds. The implementation code is as follows:

    1: private List<TaskHolder<ID, T>> getWork() throws InterruptedException {2: // Initiates the request semaphores and obtains the work queue for batch tasks. BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems(); 5: List<TaskHolder<ID, T>> result; 6: do { 7: result = workQueue.poll(1, TimeUnit.SECONDS); 8: } while (! isShutdown.get() && result == null); 9: return result; 10:}Copy the code
    • Line 3: Call the TaskDispatcher#requestWorkItems() method to initiate a request semaphore and get a work queue for batch tasks. The implementation code is as follows:

      // taskdispatcher. Java /** * Private final Semaphore batchWorkRequests = new Semaphore(0); /** * Private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>(); BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() { batchWorkRequests.release(); return batchWorkQueue; }Copy the code
      • Pay attention to, batch task work queue (batchWorkQueue) and single-task work queues (singleItemWorkQueue) isDifferent queues.
    • Lines 5 through 8: Loop to retrieve a batch task until it succeeds.

  • Line 12: Call #getTasksOf(…) Method to obtain the actual batch tasks. The implementation code is as follows:

    private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
        List<T> tasks = new ArrayList<>(holders.size());
        for (TaskHolder<ID, T> holder : holders) {
            tasks.add(holder.getTask());
        }
        return tasks;
    }
    Copy the code
    • x
  • Lines 14-24: Call the TaskProcessor to perform the task. Call AcceptorExecutor#reprocess(…) when task execution results in Congestion or TransientError. Submit the whole batch task for re-processing, the implementation code is as follows:

    // AcceptorExecutor.java void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult ProcessingResult) {// Add to the reexecution queue reprocessQueue.addall (holders); ReplayedTasks += holders. Size (); . / / submit task results to TrafficShaper TrafficShaper registerFailure (processingResult); }Copy the code

10.2 Single-task worker thread

SingleTaskWorkerRunnable executes # Run (…) Methods, scheduling tasks, and BatchWorkerRunnable#run(…) Basically similar, so I won’t bore you. The implementation code is as follows:

@Override // SingleTaskWorkerRunnable.java public void run() { try { while (! Isshutdown.get ()) {// Initiate request semaphore, And obtain the single task work queue BlockingQueue < TaskHolder < T > ID > workQueue = taskDispatcher. RequestWorkItem (); TaskHolder<ID, T> taskHolder; While ((taskHolder = workqueue.poll (1, timeunit.seconds)) == null) {if (isshutdown.get ()) {return; }} / / TODO taro: monitoring the related, ignored the metrics. RegisterExpiryTime (taskHolder); if (taskHolder ! ProcessingResult result = processor.process(taskholder.gettask ()); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(taskHolder, result); // submit a reprocessing break; case PermanentError: logger.warn("Discarding a task of {} due to permanent error", workerName); } / / TODO taro: monitoring the related, ignored the metrics. RegisterTaskResult (result, 1); } } } catch (InterruptedException e) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery WorkerThread error", e); }}Copy the code

666. The eggs

😈 is another long article. It is suggested that while looking at the code, the overall flow chart is compared to understand the actual is not difficult.

Of course, you are welcome to have any questions, in my public number (taro channel source) message.

Fat friends, share my public number (impression channel source code) to your fat friends?