This is the 15th day of my participation in the August Text Challenge.More challenges in August
Master-worker mode is a common high-concurrency mode. Its core idea is the separation of task scheduling and execution. The role of the scheduling task is Master, and the role of the executing task is Worker, and Master is responsible for receiving and allocating tasks and merging task results. The Worker process is responsible for executing tasks. Master-worker mode is a merge type mode.
The Master – Worker architecture
-
Master has two attributes, one is to store the tasks to be executed, and the other is the task scheduling thread, which obtains the tasks from the task queue and sends them to the worker thread.
-
Worker also has two attributes, one is to store the task to be executed, and the other is the thread executing the task. The thread executing the task constantly obtains and executes the task from the blocking queue.
Code implementation
Because it involves code encapsulation, all code ends up in the same package, using package-specific access rights to restrict access to some methods, allowing access only to classes within the package.
Task encapsulation
-
The core properties of a Task are taskId,workerId, the result of the task’s execution, and asynchronous callbacks to the result.
-
Specific tasks are given to the user through the abstract method exec
-
Each task generates a unique taskId via WORKER_ID_GENERATOR
public abstract class Task<R> {
private static final AtomicInteger WORKER_ID_GENERATOR = new AtomicInteger(0);
/** * Task ID */
private final int taskId;
/** * The Id of the work thread executing the task */
@Setter
private int workerId;
/**
* 任务执行结果
*/
private R result;
/** * Callback to the result */ after the task is executed
private Consumer<R> resultAction;
public Task(a) {
this.taskId = WORKER_ID_GENERATOR.getAndIncrement();
}
/** * Execute the task, execute the task and call back */
void execute(a) {
this.result = this.exec();
if(resultAction ! =null) {
resultAction.accept(this.result);
}
System.out.println(toString());
}
/** * Execute the task **@return R
*/
protected abstract R exec(a);
@Override
public String toString(a) {
return "Task{" +
"taskId=" + taskId +
", workerId=" + workerId +
", result=" + result +
'} ';
}
void setResultAction(Consumer<R> resultAction) {
this.resultAction = resultAction; }}Copy the code
Worker
The Worker receives tasks assigned by the Master and also caches local tasks through blocking queues. As the consumers of the blocking queue of local tasks, the thread owned by the Worker constantly obtains and executes tasks from the blocking queue, and calls the callback function passed by the Master after completion of execution.
public class Worker<R> {
private final ArrayBlockingQueue<Task<R>> taskQueue = new ArrayBlockingQueue<>(32);
private static final AtomicInteger WORKER_ID_GENERATOR = new AtomicInteger();
private final int workerId;
private final Thread workerThread;
public Worker(a) {
this.workerId = WORKER_ID_GENERATOR.getAndIncrement();
workerThread = new Thread(this::execute);
workerThread.start();
}
private void execute(a) {
while (true) {
try {
Task<R> task = taskQueue.take();
task.setWorkerId(workerId);
task.execute();
} catch(InterruptedException e) { e.printStackTrace(); }}}public void submit(Task<R> task) { taskQueue.offer(task); }}Copy the code
Master
The Master is responsible for receiving the tasks submitted by the client and then caching them through a blocking queue. As the consumers of the blocking queue, the thread owned by the Master constantly obtains tasks from the blocking queue and distributes them to the Worker in turn.
public class Master<T extends Task<R>, R> {
private final Map<String, Worker<R>> workerMap = new HashMap<>();
private final ArrayBlockingQueue<Task<R>> taskQueue = new ArrayBlockingQueue<>(1024);
private Thread masterThread = null;
public Master(int workCount) {
for (int i = 0; i < workCount; i++) {
Worker<R> worker = new Worker<>();
workerMap.put(String.format("%s-%s"."worker", i), worker);
}
masterThread = new Thread(this::execute);
masterThread.start();
}
private void execute(a) {
while (true) {
workerMap.forEach((workerName, worker) -> {
try {
Task<R> task = this.taskQueue.take();
worker.submit(task);
} catch(InterruptedException e) { e.printStackTrace(); }}); }}public void submit(T task, Consumer<R> resultAction) { task.setResultAction(resultAction); taskQueue.add(task); }}Copy the code
Case of actual combat
Accumulative 1 ~ 1000000
Analysis, the sum can be divided into many sub-tasks, each task is responsible for a part of the sum, the results of each sub-task are merged.
- Task split
public class NumAddTask extends Task<Long> {
private final long init;
public NumAddTask(long init) {
this.init = init;
}
@Override
protected Long exec(a) {
long res = 0;
// Each thread adds up to 100,000
for (int i = 0; i < 100000; i++) {
res += i + init;
}
returnres; }}Copy the code
- Need to implement
public class MasterWorkerTest {
@Test
public void test(a) throws InterruptedException {
AtomicLong totalCount = new AtomicLong(0);
Master<NumAddTask, Long> master = new Master<>(4);
master.submit(new NumAddTask(1), totalCount::addAndGet);
master.submit(new NumAddTask(100001), totalCount::addAndGet);
master.submit(new NumAddTask(200001), totalCount::addAndGet);
master.submit(new NumAddTask(300001), totalCount::addAndGet);
master.submit(new NumAddTask(400001), totalCount::addAndGet);
master.submit(new NumAddTask(500001), totalCount::addAndGet);
master.submit(new NumAddTask(600001), totalCount::addAndGet);
master.submit(new NumAddTask(700001), totalCount::addAndGet);
master.submit(new NumAddTask(800001), totalCount::addAndGet);
master.submit(new NumAddTask(900001), totalCount::addAndGet);
// Prevent the main thread from exiting
Thread.sleep(3000);
System.out.println("total Count = "+ totalCount); }}Copy the code
- The execution result of each task and its total result
Task{taskId=0, workerId=2, result=5000050000}
Task{taskId=1, workerId=1, result=15000050000}
Task{taskId=2, workerId=0, result=25000050000}
Task{taskId=3, workerId=3, result=35000050000}
Task{taskId=7, workerId=3, result=75000050000}
Task{taskId=4, workerId=2, result=45000050000}
Task{taskId=5, workerId=1, result=55000050000}
Task{taskId=6, workerId=0, result=65000050000}
Task{taskId=8, workerId=2, result=85000050000}
Task{taskId=9, workerId=1, result=95000050000}
total Count = 500000500000
Copy the code
Master-worker mode in Netty
-
High performance transmission model — Reactor model is an application of master-worker model in the field of transmission.
-
In Netty, there is a thread inside the EventLoop reactor that polls the Java NIO selector for events and then distributes the corresponding events. The target of event Dispatch is Netty’s Handler Handler (including user-defined business handlers).
-
The Netty server program needs to set two EventLoopGroup polling groups. One group is responsible for listening and receiving new connections, and the other group is responsible for polling and distributing I/O transmission events. The two polling groups are responsible for the following:
- The EventLoopGroup that listens to and receives new connections polls the Reactor in the Reactor group to query IO events of new connections on the query channel.
- The Reactor in the other polling group completes the IO event query for all subchannels and executes the corresponding Handler Handler for IO processing, such as data input and output.
-
Netty is not only based on Reactor model but also reflects the idea of master-worker model. Netty EventLoop (Reactor role) can correspond to Worker role in master-worker mode. The Netty EventLoopGroup polling group can correspond to the Master role in master-worker mode