sequence
This article focuses on Eureka’s TaskDispatcher
PeerEurekaNode
public class PeerEurekaNode { public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) { this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS); } /* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config, int batchSize, long maxBatchingDelayMs, long retrySleepTimeMs, long serverUnavailableSleepTimeMs) { this.registry = registry; this.targetHost = targetHost; this.replicationClient = replicationClient; this.serviceUrl = serviceUrl; this.config = config; this.maxProcessingDelayMs = config.getMaxTimeForReplication(); String batcherName = getBatcherName(); ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient); this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher( batcherName, config.getMaxElementsInPeerReplicationPool(), batchSize, config.getMaxThreadsForPeerReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor ); this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher( targetHost, config.getMaxElementsInStatusReplicationPool(), config.getMaxThreadsForStatusReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor ); } / /... }Copy the code
- statusUpdate
/** * Send the status information of of the ASG represented by the instance. * * <p> * ASG (Autoscaling group) names are availablefor instances in AWS and the
* ASG information is used for determining if the instance should be
* registered as {@link InstanceStatus#DOWN} or {@link InstanceStatus#UP}.
*
* @param asgName
* the asg name ifany of this instance. * @param newStatus * the new status of the ASG. */ public void statusUpdate(final String asgName, final ASGStatus newStatus) { long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs; nonBatchingDispatcher.process( asgName, new AsgReplicationTask(targetHost, Action.StatusUpdate, asgName, newStatus) { public EurekaHttpResponse<? >execute() {
return replicationClient.statusUpdate(asgName, newStatus);
}
},
expiryTime
);
}
Copy the code
Submit tasks to nonBatchingDispatcher
- cancel
public void cancel(final String appName, final String id) throws Exception {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
batchingDispatcher.process(
taskId("cancel", appName, id),
new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.cancel(appName, id);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
}
}
},
expiryTime
);
}
Copy the code
Methods like Cancel are submitted to batchingDispatcher
ReplicationTask
Eureka – core – 1.8.8 – sources jar! /com/netflix/eureka/cluster/ReplicationTask.java
/**
* Base class for all replication tasks.
*/
abstract class ReplicationTask {
private static final Logger logger = LoggerFactory.getLogger(ReplicationTask.class);
protected final String peerNodeName;
protected final Action action;
ReplicationTask(String peerNodeName, Action action) {
this.peerNodeName = peerNodeName;
this.action = action;
}
public abstract String getTaskName();
public Action getAction() {
returnaction; } public abstract EurekaHttpResponse<? > execute() throws Throwable; public voidhandleSuccess() {
}
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
logger.warn("The replication of task {} failed with response code {}", getTaskName(), statusCode); }}Copy the code
It is the base class for all replication tasks
InstanceReplicationTask
Eureka – core – 1.8.8 – sources jar! /com/netflix/eureka/cluster/InstanceReplicationTask.java
/**
* Base {@link ReplicationTask} class forinstance related replication requests. * * @author Tomasz Bak */ public abstract class InstanceReplicationTask extends ReplicationTask { /** * For cancel request there may be no InstanceInfo object available so we need to store app/id pair * explicitly. */ private final String appName; private final String id; private final InstanceInfo instanceInfo; private final InstanceStatus overriddenStatus; private final boolean replicateInstanceInfo; / /... }Copy the code
Replication tasks related to instance, PeerEurekaNode uses InstanceReplicationTask for register, heartbeat, statusUpdate, deleteStatusOverride, and Cancel. Where statusUpdate is submitted to nonBatchingDispatcher, all other submissions are submitted to batchingDispatcher
TaskDispatcher
Eureka – core – 1.8.8 – sources jar! /com/netflix/eureka/util/batcher/TaskDispatcher.java
/**
* Task dispatcher takes task from clients, and delegates their execution to a configurable number of workers.
* The task can be processed one at a time or in batches. Only non-expired tasks are executed, and if a newer
* task with the same id is scheduled for execution, the old one is deleted. Lazy dispatch of work (only on demand)
* to workers, guarantees that data are always up to date, and no stale task processing takes place.
* <h3>Task processor</h3>
* A client of this component must provide an implementation of {@link TaskProcessor} interface, which will do
* the actual work of task processing. This implementation must be thread safe, as it is called concurrently by
* multiple threads.
* <h3>Execution modes</h3>
* To create non batched executor call {@link TaskDispatchers#createNonBatchingTaskDispatcher(String, int, int, long, long, TaskProcessor)}
* method. Batched executor is created by {@link TaskDispatchers#createBatchingTaskDispatcher(String, int, int, int, long, long, TaskProcessor)}.
*
* @author Tomasz Bak
*/
public interface TaskDispatcher<ID, T> {
void process(ID id, T task, long expiryTime);
void shutdown();
}
Copy the code
The TaskDispatcher is for task dispatch, and the most important thing is that only tasks that have not expired will be executed, and if there is a newer task dispatch with the same ID, the old one will be deleted. There are two types of TaskDispatcher: nonBatchingDispatcher and batchingDispatcher.
TaskDispatchers
Eureka – core – 1.8.8 – sources jar! /com/netflix/eureka/util/batcher/TaskDispatchers.java
public class TaskDispatchers {
public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id,
int maxBufferSize,
int workerCount,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs,
TaskProcessor<T> taskProcessor) {
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, 1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
);
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
return new TaskDispatcher<ID, T>() {
@Override
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); }}; } public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor<T> taskProcessor) { final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);return new TaskDispatcher<ID, T>() {
@Override
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); }}; }}Copy the code
Two factory methods are provided to create a nonBatchingDispatcher and a batchingDispatcher. For the former, maxBatchingSize = 1 and TaskExecutors = singleItemExecutors; MaxBatchingSize set by the constructor (default: 250) and TaskExecutors (batchExecutors method).
AcceptorExecutor
Eureka – core – 1.8.8 – sources jar! /com/netflix/eureka/util/batcher/AcceptorExecutor.java
private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();
void process(ID id, T task, long expiryTime) {
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) {
reprocessQueue.addAll(holders);
replayedTasks += holders.size();
trafficShaper.registerFailure(processingResult);
}
void reprocess(TaskHolder<ID, T> taskHolder, ProcessingResult processingResult) {
reprocessQueue.add(taskHolder);
replayedTasks++;
trafficShaper.registerFailure(processingResult);
}
Copy the code
Process is placed in the acceptorQueue, and reProcess is placed in the reprocessQueue
AcceptorRunner
ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
this.acceptorThread.setDaemon(true);
this.acceptorThread.start();
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
while(! isShutdown.get()) { try { drainInputQueues(); int totalItems = processingOrder.size(); long now = System.currentTimeMillis();if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
assignBatchWork();
assignSingleItemWork();
}
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery AcceptorThread error", e); }}} / /... }Copy the code
This will drainInputQueues, then assignBatchWork, assignSingleItemWork
drainInputQueues
private void drainInputQueues() throws InterruptedException {
do {
drainReprocessQueue();
drainAcceptorQueue();
if(! isShutdown.get()) { // If all queues are empty, blockfor a while on the acceptor queue
if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
if(taskHolder ! = null) { appendTaskHolder(taskHolder); }}}}while(! reprocessQueue.isEmpty() || ! acceptorQueue.isEmpty() || pendingTasks.isEmpty()); }Copy the code
DrainReprocessQueue and drainAcceptorQueue are called here
- drainAcceptorQueue
private void drainAcceptorQueue() {
while(! acceptorQueue.isEmpty()) { appendTaskHolder(acceptorQueue.poll()); } } private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {if (isFull()) {
pendingTasks.remove(processingOrder.poll());
queueOverflows++;
}
TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
if (previousTask == null) {
processingOrder.add(taskHolder.getId());
} else{ overriddenTasks++; }}Copy the code
Remove the tasks from the acceptorQueue and place them in the pendingTasks queue
- drainReprocessQueue
private void drainReprocessQueue() {
long now = System.currentTimeMillis();
while(! reprocessQueue.isEmpty() && ! isFull()) { TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast(); ID id = taskHolder.getId();if (taskHolder.getExpiryTime() <= now) {
expiredTasks++;
} else if (pendingTasks.containsKey(id)) {
overriddenTasks++;
} else{ pendingTasks.put(id, taskHolder); processingOrder.addFirst(id); }}if(isFull()) { queueOverflows += reprocessQueue.size(); reprocessQueue.clear(); }}Copy the code
Take the tasks out of the reprocessQueue and place them in pendingTasks if they are not expired and have a duplicate ID, and processingOrder.addFirst(ID)
assign work
void assignSingleItemWork() {
if(! processingOrder.isEmpty()) {if (singleItemWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
while(! processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id);if (holder.getExpiryTime() > now) {
singleItemWorkQueue.add(holder);
return;
}
expiredTasks++;
}
singleItemWorkRequests.release();
}
}
}
void assignBatchWork() {
if (hasEnoughTasksForNextBatch()) {
if (batchWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
int len = Math.min(maxBatchingSize, processingOrder.size());
List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
while(holders.size() < len && ! processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id);if (holder.getExpiryTime() > now) {
holders.add(holder);
} else{ expiredTasks++; }}if (holders.isEmpty()) {
batchWorkRequests.release();
} else{ batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS); batchWorkQueue.add(holders); }}}}Copy the code
PendingTasks is placed in singleItemWorkQueue or batchWorkQueue by priority
WorkerRunnable
abstract static class WorkerRunnable<ID, T> implements Runnable {
final String workerName;
final AtomicBoolean isShutdown;
final TaskExecutorMetrics metrics;
final TaskProcessor<T> processor;
final AcceptorExecutor<ID, T> taskDispatcher;
WorkerRunnable(String workerName,
AtomicBoolean isShutdown,
TaskExecutorMetrics metrics,
TaskProcessor<T> processor,
AcceptorExecutor<ID, T> taskDispatcher) {
this.workerName = workerName;
this.isShutdown = isShutdown;
this.metrics = metrics;
this.processor = processor;
this.taskDispatcher = taskDispatcher;
}
String getWorkerName() {
returnworkerName; }}Copy the code
Defines the basic Runnable classes
SingleTaskWorkerRunnable
private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>();
static class SingleTaskWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
SingleTaskWorkerRunnable(String workerName,
AtomicBoolean isShutdown,
TaskExecutorMetrics metrics,
TaskProcessor<T> processor,
AcceptorExecutor<ID, T> acceptorExecutor) {
super(workerName, isShutdown, metrics, processor, acceptorExecutor);
}
@Override
public void run() {
try {
while(! isShutdown.get()) { BlockingQueue<TaskHolder<ID, T>> workQueue = taskDispatcher.requestWorkItem(); TaskHolder<ID, T> taskHolder;while ((taskHolder = workQueue.poll(1, TimeUnit.SECONDS)) == null) {
if (isShutdown.get()) {
return;
}
}
metrics.registerExpiryTime(taskHolder);
if(taskHolder ! = null) { ProcessingResult result = processor.process(taskHolder.getTask()); switch (result) {case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(taskHolder, result);
break;
case PermanentError:
logger.warn("Discarding a task of {} due to permanent error", workerName);
}
metrics.registerTaskResult(result, 1);
}
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e); }}}Copy the code
TaskHolder<ID, T>> Poll task from singleItemWorkQueue
BatchWorkerRunnable
private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
BatchWorkerRunnable(String workerName,
AtomicBoolean isShutdown,
TaskExecutorMetrics metrics,
TaskProcessor<T> processor,
AcceptorExecutor<ID, T> acceptorExecutor) {
super(workerName, isShutdown, metrics, processor, acceptorExecutor);
}
@Override
public void run() {
try {
while(! isShutdown.get()) { List<TaskHolder<ID, T>> holders = getWork(); metrics.registerExpiryTimes(holders); List<T> tasks = getTasksOf(holders); ProcessingResult result = processor.process(tasks); switch (result) {case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
List<TaskHolder<ID, T>> result;
do {
result = workQueue.poll(1, TimeUnit.SECONDS);
} while(! isShutdown.get() && result == null);return (result == null) ? new ArrayList<>() : result;
}
private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
List<T> tasks = new ArrayList<>(holders.size());
for (TaskHolder<ID, T> holder : holders) {
tasks.add(holder.getTask());
}
returntasks; }}Copy the code
List<TaskHolder<ID, T>> poll task from batchWorkQueue
The logic for ProcessingResult is the same for both, as follows:
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
Copy the code
Requeue retry for Congestion and TransientError, log warn for PermanentError.
summary
Eureka designs TaskDispatcher by himself, which is divided into nonBatchingDispatcher and batchingDispatcher.
Scheduling tasks are instancereplicationTasks inherited from ReplicationTask, which define basic properties. But public Abstract String getTaskName() and public Abstract EurekaHttpResponse
execute() throws Throwable two abstract methods, which have anonymous implementation classes in PeerEurekaNode and implement the corresponding request logic such as register and cancel.
The scheduling logic mainly supports scheduling by ID and priority. Subsequent tasks with the same ID will overwrite the running tasks with the same ID. If processing fails, the tasks will be placed in the retry queue, and then placed in the pendingTasks with the highest priority.
doc
- ReplicationTask