0 x00 the
Alink is a new generation of machine learning algorithm platform developed by Alibaba based on real-time computing engine Flink. It is the first machine learning platform in the industry that supports both batch algorithm and streaming algorithm.
This paper will lead you to analyze the implementation of communication model AllReduce in Alink. AllReduce is widely used in Alink, such as KMeans, LDA, Word2Vec, GD, LBFGS, Newton Method, OWLQN, SGD, Gbdt and Random Forest.
Since Alink’s public information is too little, the following is my own speculation, and there will definitely be omissions. I hope you can point out that I will update at any time.
0x01 What is MPI
Message-passing Interface (MPI) is a cross-language communication protocol used to write parallel computing, supporting point-to-point and broadcast.
MPI aims for high performance, large scale, and portability. MPI is still the dominant model for high performance computing today.
Its characteristic is
-
A partitioned address space Each thread can only read non-local data by calling the API. All non-local Memory interactions need to be coordinated (handshake).
-
Supports only Explicit Parallelization. Users must specify how messages are delivered.
AllReduce is a basic primitive provided by MPI, and we need to understand Reduce to better understand AllReduce.
-
Specification function MPI_Reduce: Specification is a classic concept from functional programming. It involves the same variable of each process in the communication cell in the protocol calculation and outputs the calculation result to the specified process. For example, a function can be used to divide a batch of data into smaller batches. Or the element of an array can be reduced to a number by the addition function.
-
Protocol and broadcast function MPI_Allreduce: Based on the calculation protocol, the results of the calculation are distributed to each process. For example, after the function returns the reduced value, it distributes the value to each process so that all process values in parallel know the value.
One difference between MPI_Allreduce and MPI_Reduce is that the MPI_Reduce function sends the final result only to the specified dest_process number, while the MPI_Allreduce function can send the result to all processes, so that all processes can receive the result. The prototype of the MPI_Allreduce function therefore does not need to specify a target process number.
0x02 Alink implements the idea of MPI
AllReduce is widely used in Alink, such as KMeans, LDA, Word2Vec, GD, LBFGS, Newton Method, OWLQN, SGD, Gbdt and Random Forest.
AllReduce plays a key role in the implementation of the algorithm, that is, it forcibly interrupts the parallel tasks that originally run in serial, summarizes and distributes the calculation results, and makes the serial execution continue. A bit like a Barrier is familiar with concurrency.
Compared with Flink’s native KMeans algorithm, we can see that AllReduce corresponds to groupBy(0).reduce. The groupBy operation cannot be performed until all data has been generated.
DataSet<Centroid> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter()).withBroadcastSet(loop, "Centroids ") // count and sum point coordinates for each centroid.map (new CountAppender()) // GroupBy (0).reduce(New CentroidAccumulator()) // compute New Centroids from point counts and coordinate sums .map(new CentroidAverager());Copy the code
From AllReduce’s notes we can clearly see Alink’s idea of implementing MPI.
* An implement of {@link CommunicateFunction} that do the AllReduce.
*
* AllReduce is a communication primitive widely used in MPI. In our implementation, all workers do reduce on a partition of the whole data and they all get the final reduce result.
*
* There're mainly three stages:
* 1\. All workers send the there partial data to other workers for reduce.
* 2\. All workers do reduce on all data it received and then send partial results to others.
* 3\. All workers merge partial results into final result and put it into session context with pre-defined object name.
*/
Copy the code
Translation:
All workers perform reduce operations on partial data, and all workers can obtain reduce final results in three stages: 1\. All workers send part of the data required by reduce to other workers 2\. All workers reduce the data they receive, and then send the result of this partial reduce to other workers 3\. All workers combine partial Reduce results into final results and put them into predefined session context variablesCopy the code
“Paper come zhongjue shallow, must know this to practice.”
In order to realize AllReduce, Alink has done a lot of work behind the scenes, and we will analyze them one by one below.
0x03 How do I Implement Sharing
Sharing is the first priority in implementing AllReduce because metadata and input and output are required in the merge/broadcast process, which can be greatly simplified if you have shared variables. Let’s take a look at how Alink implements sharing through Task Manager.
1. Task related concepts
-
**Task**(Task) : A Task is a collection of multiple subtasks with the same function at one stage, similar to the TaskSet in Spark.
-
**subTask**(subTask) : A subTask is the smallest execution unit of a task in Flink. It is an instance of a Java class that has properties and methods that perform specific computation logic.
-
** Chain optimization ** : Supposedly one instance of parallelism of each operator is a subtask. This causes a lot of problems, because Flink’s TaskManager runs tasks in a separate thread for each task, which incurs a lot of thread switching overhead, which affects throughput. In order to alleviate this situation, Flink carries out optimization, that is, chain operation on subtask, and the task obtained after chain operation is put into a thread for execution as a scheduling execution unit.
-
**Operator Chains** : Flink combines multiple subtasks into one Task, a process called Operator Chains, and each Task is executed by a single thread. You can concatenate multiple separate subtasks into a single task using Operator Chains. Similar to the Spark Pipeline.
-
**Slot**(Slot) : A unit in Flink where computing resources are isolated. Multiple subtasks can run in a Slot, but these subtasks must be from different phases of the same application. As a result, each slot can execute an entire pipeline for the job.
Programs in Flink are essentially parallel. During execution, each Operator Transformation has one or more Operator subtasks. Each Operator subTask is independent of each other and executes on a different thread, possibly on a different machine or container.
The same application, subtasks of multiple different tasks, can run in the same slot. Multiple subtasks in the same task cannot run in one slot resource slot, they can be scattered to other resource slots. Multiple instances of AllReduceSend parallelism cannot be run in the same slot.
2. TaskManager
Each TaskManager in Flink is a JVM process that may execute one or more subtasks on separate threads. TaskManager functions as a Slave node in the entire cluster and is responsible for executing specific tasks and applying for and managing resources on each node for corresponding tasks.
In order to isolate resources and increase the number of tasks allowed, TaskManager introduces the concept of slot, which isolates resources only from memory and is evenly divided. A TaskManager has at least one slot. If a TM has N Slots, the Memory allocated to each Slot is 1/N of the entire TM Memory. The Slots within a TM are only Memory isolated and shared by cpus.
The client compiles and packages the written Flink application and submits it to JobManager. Then JobManager assigns tasks to TaskManager nodes with resources according to the resources of TaskManager registered in JobManager. Then start and run the task.
The TaskManager receives the tasks to be deployed from JobManager, starts the Task using Slot resources, establishes a network connection for data access, receives the data, and begins data processing. The taskManagers interact with each other through data flows.
Flink’s tasks run in a multi-threaded manner, where a TaskManager(TM) executes multiple tasks concurrently in multiple threads. This is very different from the multi-JVM approach of MapReduce. Flink can greatly improve CPU efficiency by sharing system resources between multiple tasks and tasks using TaskSlot. Each TaskManager manages multiple TaskSlot resource pools to effectively manage resources.
This translates to: multiple parallel AllReduceSend instances running in the middle of a TaskManager will share all static variables in the TaskManager.
3. Status sharing
Alink implements variable sharing using static variables in Task Manager. Several of the main classes and concepts are more complex. As we go from top to bottom, we can see that as we go from top to bottom, the number of tokens and states required increases.
3.1 Conceptual Analysis
From the top down the levels are as follows:
Algorithm Angle: ComContext
User code calls: context.getobj (bufferName); This is ideal for the user, who knows the variable name and can access it in context.
ComContext needs to know more, such as its own sessioin and taskID, as explained below.
ComContext calls down like this: sessionShareDobjs. put(objName, sessionId, taskId, obj);
Framework Angle: IterativeComQueue
IterativeComQueue is a framework concept. Take Kmeans for example, that is, the Kmeans algorithm corresponds to several Iterativecomqueues.
IterativeComQueue has multiple compute/communicate functions. Each function should know which IterativeComQueue it belongs to and how to communicate with other functions on the Queue. I can’t confuse it with any other Queue. So you need to have a concept that represents this Queue. Hence the following concept of Session.
Session Angle: SessionSharedObjs
In order to distinguish each IterativeComQueue, the concept of session was created. Then all compute/ Communicate functions on the IterativeComQueue bind to the same session ID, and all functions on the same IterativeComQueue can communicate with each other.
An IterativeComQueue corresponds to a session, so <” variable name “+ sessionId> corresponds to a variable that the session can access.
SessionSharedObjs contains static member variables:
-
int sessionId = 0; An incremented flag is used to distinguish sessions.
-
A HashMap, Long > key2Handle. Mapping: indicates that a variable name in a session corresponds to a variable handle.
Normally, “a variable of some name” corresponds to “a variable handle”. That is, a variable name in a session corresponds to a variable handle. However, in Flink, there will be multiple subtasks operating in parallel, so a new concept is needed to identify the variable corresponding to the subtask, which should be associated with taskId. So we have the following concept of state.
SessionSharedObjs calls down: iterTaskObjkeeper. put(handle, taskId, obj);
Subtask: IterTaskObjKeeper
This is using static variables to implement sharing. Is a shared variable instance accessible to all Tasks (Threads) in Task Manager.
IterTaskObjKeeper contains static member variables:
-
long handle = 0L; An incremental identifier is used to distinguish state.
-
Map states; It’s a mapping. That is, handle represents which variable state and the state instance corresponding to “which task” in this variable, which is a subdivision for subTask.
In Flink, an algorithm is operated on in parallel by multiple subtasks. If there is only one Handle, then multiple subtasks can access it, leading to the familiar problems of multithreading. So Alink here splits Handle into multiple states. From the perspective of subtask, each state is uniquely identified with
.
To sum up, for the same variable name, the shared state corresponding to each subtask is actually independent and does not interfere with each other. Sharing is essentially sharing between operators running on the subtask.
3.2 Analysis of variable instances
We can get a clearer idea from the variables actually executed.
Session 0; CentroidAllReduce this variable corresponding to the handle is 7 SessionSharedObjs. Key2Handle = {HashMap @ 10480} {10492} Tuple2 @ size = 9 "(initCentroid,0)" -> {Long@10493} 1 {Tuple2@10494} "(statistics,0)" -> {Long@10495} 2 {Tuple2@10496} "(362158A2-588B-429F-b848-c901a1e15E17,0)" -> {Long@10497} 8 {Tuple2@10498} "(k,0)" -> {Long@10499} 6 {Tuple2@10500} "(centroidAllReduce,0)" -> {Long@10501} 7 // here is said {Tuple2@10502} "(trainData,0)" -> {Long@10503} 0 {Tuple2@10504} "(vectorSize,0)" -> {Long@10505} 3 {Tuple2@10506} "(centroid2,0)" -> {Long@10507} 5 {Tuple2@10508} "(Centroid1,0)" -> {Long@10509} 4 // As can be seen below, the variable handle 7 has four subtasks, so it is subdivided into four states. Com.alibaba.alink.common.com queue. IterTaskObjKeeper. States = {HashMap @ 10520} {10571} Tuple2 @ size = 36 "(7, 0)" - > {double [15] @ 10572} {10573} Tuple2 @ "(7, 1)" - > {double [15] @ 10574} {10577} Tuple2 @ "(7, 2)" - > {double [15] @ 10578} {10581} Tuple2 @ "(7, 3)" - > {double [15] @ 10582} {10575} Tuple2 @ "(5, 0)" - > {10576} Tuple2 @ "(10, com.alibaba.alink.operator.com mon. Short. 29 a72fbb FastDistanceMatrixData @)" {10579} Tuple2 @ "(5, 1)" - > {10580} Tuple2 @ "(10, com.alibaba.alink.operator.com mon. Short. FastDistanceMatrixData @ 26 c52354)" {10585} Tuple2 @ "(5, 2)" -> {Tuple2@10586} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@7c6ed779)" {Tuple2@10588} "(5, 3)" - > {10589} Tuple2 @ "(10, com.alibaba.alink.operator.com mon. Short. FastDistanceMatrixData @ 154 b8a4d)"Copy the code
Let’s go through the code and parse each of the classes.
3.3 ComContext
ComContext is the topmost class that gets runtime information and shared variables. All compute/ Communicate functions on IterativeComQueue (BaseComQueue) access shared variables through ComContext. Such as:
Public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable {// Each BaseComQueue gets a unique sessionId. private final int sessionId = SessionSharedObjs.getNewSessionId(); int taskId = getRuntimeContext().getIndexOfThisSubtask(); public void mapPartition(Iterable<byte[]> values, Collector<byte[]> out) {Collector<byte[]> out) {Collector<byte[]> out) { getIterationRuntimeContext()); If (getIterationRuntimeContext (.) getSuperstepNumber () = = maxIter | | criterion) {/ / use ComContext continued access to a Shared variable List < Row > model = completeResult.calc(context); }} // The user calls double[] sendBuf = context.getobj (bufferName);Copy the code
As you can see, ComContext is the top-level context concept that the user should see. TaskId, sessionId is the key to use.
-
SessionId is a static class member variable defined in SessionSharedObjs that is automatically incremented. Each BaseComQueue gets a unique sessionId, that is, the Queue maintains a unique session. So all ComContext generated in BaseComQueue will have the same sessionId.
-
TaskId is obtained from runtime.
/ * *
- Encapsulates task-specific information: name, index of subtask, parallelism and attempt number.
/ @Internal public class TaskInfo { /*
- Gets the number of this parallel subtask. The numbering starts from 0 and goes up to parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
- @return The index of the parallel subtask.
*/ public int getIndexOfThisSubtask() { return this.indexOfSubtask; // get taskId}}
The ComContext concrete class is defined as follows
/** * Context used in BaseComQueue to access basic runtime information and shared objects. */ public class ComContext { private final int taskId; private final int numTask; private final int stepNo; private final int sessionId; public ComContext(int sessionId, IterationRuntimeContext runtimeContext) { this.sessionId = sessionId; this.numTask = runtimeContext.getNumberOfParallelSubtasks(); this.taskId = runtimeContext.getIndexOfThisSubtask(); this.stepNo = runtimeContext.getSuperstepNumber(); } /** * Put an object into shared objects for access of other QueueItem of the same taskId. * * @param objName object name * @param obj object itself. */ public void putObj(String objName, Object obj) { SessionSharedObjs.put(objName, sessionId, taskId, obj); For example, this = {ComContext@10578} taskId = 4 numTask = 8 stepNo = 1 sessionId = 0Copy the code
3.4 SessionSharedObjs
SessionSharedObjs is a next-level class that maintains shared session objects, which are shared using session ids.
SessionSharedObjs maintains a static class variable sessionId to differentiate sessions.
The SessionSharedObjs core is HashMap
, Long> key2Handle. A mapping of <” variable name “+ sessionId> –> < real variable handle>.
An IterativeComQueue corresponds to a session, so <” variable name “+ sessionId> corresponds to a variable that the IterativeComQueue can access. Normally, one variable, handle, is enough.
However, since an IterativeComQueue will be executed by several subtasks in parallel, each Handle is subdivided into several states for mutual exclusion and differentiation. Each state is uniquely identified by
. It will be mentioned below.
/** * An static class that manage shared objects for {@link BaseComQueue}s. */ class SessionSharedObjs implements Serializable { private static HashMap<Tuple2<String, Integer>, Long> key2Handle = new HashMap<>(); private static int sessionId = 0; private static ReadWriteLock rwlock = new ReentrantReadWriteLock(); /** * Get a new session id. * All access operation should bind with a session id. This id is usually shared among compute/communicate function of an {@link IterativeComQueue}. * * @return new session id. */ synchronized static int getNewSessionId() { return sessionId++; } static void put(String objName, int session, int taskId, Object obj) { rwlock.writeLock().lock(); try { Long handle = key2Handle.get(Tuple2.of(objName, session)); if (handle == null) { handle = IterTaskObjKeeper.getNewHandle(); key2Handle.put(Tuple2.of(objName, session), handle); } // call here. TaskId is also key to identification. IterTaskObjKeeper.put(handle, taskId, obj); } finally { rwlock.writeLock().unlock(); }}}Copy the code
3.5 IterTaskObjKeeper
This is the lowest level shared class and is a static instance on the heap memory of the Task Manager process. All tasks (Threads) of task Manager can be shared.
IterTaskObjKeeper is shared across the JVM using a static variable called States. The contents are determined by ‘handle’ and ‘taskId’.
IterTaskObjKeeper maintains the handle increment as the unique type identifier for the “variable state”.
Use <handle, taskId> as the unique identifier for “variable state”. This is the variable shared in the Task Manager Process heap memory.
State, <handle, taskId> indicates the variable corresponding to the task in this variable. This is a subdivision for Task.
/**
* A 'state' is an object in the heap memory of task manager process,
* shared across all tasks (threads) in the task manager.
* Note that the 'state' is shared by all tasks on the same task manager,
* users should guarantee that no two tasks modify a 'state' at the same time.
* A 'state' is identified by 'handle' and 'taskId'.
*/
public class IterTaskObjKeeper implements Serializable {
private static Map <Tuple2 <Long, Integer>, Object> states;
/**
* A 'handle' is a unique identifier of a state.
*/
private static long handle = 0L;
private static ReadWriteLock rwlock = new ReentrantReadWriteLock();
static {
states = new HashMap <>();
}
/**
* @note Should get a new handle on the client side and pass it to transformers.
*/
synchronized public static long getNewHandle() {
return handle++;
}
public static void put(long handle, int taskId, Object state) {
rwlock.writeLock().lock();
try {
states.put(Tuple2.of(handle, taskId), state);
} finally {
rwlock.writeLock().unlock();
}
}
}
Copy the code
0 XFF reference
My way of parallel computing (4) Reduce and Allreduce for MPI set communication
Message Passing Interface(MPI)
Flink Dataflow, Task, subTask, Operator Chains, Slot introduction
TaskManager of the Flink runtime executes tasks
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.