0 x00 the
Alink is a new generation of machine learning algorithm platform developed by Alibaba based on real-time computing engine Flink. It is the first machine learning platform in the industry that supports both batch algorithm and streaming algorithm. This paper will lead you to analyze the implementation of communication model AllReduce in Alink.
AllReduce is widely used in Alink, such as KMeans, LDA, Word2Vec, GD, LBFGS, Newton Method, OWLQN, SGD, Gbdt and Random Forest.
Since Alink’s public information is too little, the following is my own speculation, and there will definitely be omissions. I hope you can point out that I will update at any time.
Sample code 0x01
Our sample code remains as follows.
KMeansTrainBatchOp call
static DataSet <Row> iterateICQ(... Omit...). { return new IterativeComQueue() .initWithPartitionedData(TRAIN_DATA, data) .initWithBroadcastData(INIT_CENTROID, initCentroid) .initWithBroadcastData(KMEANS_STATISTICS, statistics) .add(new KMeansPreallocateCentroid()) .add(new KMeansAssignCluster(distance)) .add(new AllReduce(CENTROID_ALL_REDUCE)) .add(new KMeansUpdateCentroids(distance)) .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) .setMaxIter(maxIter) .exec(); }Copy the code
AllReduce implementation
The main code excerpt of Alink’s AllReduce is as follows:
public static <T> DataSet <T> allReduce(
return input
.mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId))
.withBroadcastSet(input, "barrier")
.returns(
new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
.name("AllReduceSend")
.partitionCustom(new Partitioner <Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key;
}
}, 0)
.name("AllReduceBroadcastRaw")
.mapPartition(new AllReduceSum(bufferName, lengthName, sessionId, op))
.returns(
new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
.name("AllReduceSum")
.partitionCustom(new Partitioner <Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key;
}
}, 0)
.name("AllReduceBroadcastSum")
.mapPartition(new AllReduceRecv <T>(bufferName, lengthName, sessionId))
.returns(input.getType())
.name("AllReduceRecv");
}
Copy the code
0 x02 AllReduce implementation
Based on the above specific codes, we first summarize the use process of AllReduce as follows
-
KMeansAssignCluster: Find the Knight Cluster for every point and calculate the sums of the points belonging to the same cluster. Then write your calculated cluster to CENTROID_ALL_REDUCE in your Task Manager.
-
Each AllReduceSend takes the previously stored cluster from CENTROID_ALL_REDUCE of its task Manager (each AllReduceSend takes the cluster that only it can see) and sends it to the downstream task. The system determines which tasks to send based on the downstream Task Index and data volume. Note that the specific part of the variable to be sent to a particular task is calculated based on the task index and data volume of that task. This calculation mechanism (how it is calculated in the code, but also part of it is sent along with the data as meta information) is reused later in AllReduceRecv.
-
Each AllReduceSum receives the cluster sent by AllReduceSend, calculates the sum, and then sends out the calculation results. Each AllReduceSum uniformly sends the data summed up by its own calculation to each downstream task.
-
Each AllReduceRecv receives the cluster (after sum) sent by all AllReduceSum. Store to the shared variable CENTROID_ALL_REDUCE. Specific how to store reuse AllReduceSend calculation mechanism, so that stored in shared variables where will not conflict with each other. Merge operation: For example, there are five AllReduce, and the data of each AllReduce is sent to five AllReduceRecv. After receiving the five data, each AllReduceRecv will write it into its corresponding state according to its own subtask index. However, the location of the 5 pieces of data to be written in the state is specified in the data metadata information, and there will be no writing conflict between each other, so that each AllReduceRecv has all 5 pieces of data.
-
KMeansUpdateCentroids: Take out the CENTROID_ALL_REDUCE variable and then Update the centroids based on the sum of points and point number belonging to the same cluster
1. KMeansAssignCluster
The purpose of this class is to calculate the nearest cluster center for each point and count and sum the coordinates of each cluster center.
We can see that KMeansAssignCluster stores CENTROID_ALL_REDUCE through ComContext for subsequent AllReduce use. If there are five Kmeans assignclusters, their results are generally different. Although the same variable name CENTROID_ALL_REDUCE is stored, its state varies.
Since the five KmeansAssignClusters must correspond to five subtasks, their <handle, taskId> in the shared variables must be different, and thus correspond to different states, so they are stored separately.
// Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. public Class KMeansAssignCluster extends ComputeFunction {// Double [] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE); if (sumMatrixData == null) { sumMatrixData = new double[k * (vectorSize + 1)]; context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData); } for (FastDistanceVectorData sample : trainData) { // Find the closest centroid from centroids for sample, and add the sample to sumMatrix. KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance, distanceMatrix); Sample = {FastDistanceVectorData@13274} vector = {DenseVector@13281} "6.3 2.5 4.9 1.5" label = {DenseVector@13282} "72.2" rows = {Row[1]@13283} // The 4-dimensional vector + 1 weight --> are both "sample and ". SumMatrixData = {double[15]@10574} 0 = 23.6 1 = 14.9 2 = 8.7 3 = 1.70000000000000000002 4 = 5.0 5 = 52.400000000000006 6 = 25.1 7 = 39.6999999999996 8 = 13.299999999999999 9 = 9.0 10 = 33.0 11 = 16.9 12 = 28.900000000000002 13 = 11.4 14 = 5.0 trainData = {ArrayList@10580} size = 19 0 = {FastDistanceVectorData@10590} vector = {DenseVector@10595} "7.7 3.8 6.7 2.2" data = {double[4]@10601} 0 = 7.7 1 = 3.8 2 = 6.7 3 = 2.2 label = {DenseVector@10596} "123.46000000000001" rows = {Row[1]@10597} 1 = {FastDistanceVectorData@10603} vector = {DenseVector@10623} "5.7 2.8 4.1 1.3" label = Rows = {Row[1]@10625} 2 = {FastDistanceVectorData@10604} 3 = {FastDistanceVectorData@10605} . 17 = {FastDistanceVectorData@10619} 18 = {FastDistanceVectorData@10620} vector = {DenseVector@10654} "6.5 3.0 5.2 2.0" Label = {DenseVector@10655} "82.29" rows = {Row[1]@10656}Copy the code
2. AllReduceSend
I’m going to have to extract this code again, mainly because of withBroadcastSet. Its role is:
-
It can be understood as a common shared variable. We can broadcast a dataset dataset, and then different tasks can obtain it on the node. There is only one copy of this data on each node.
-
If broadcast is not used, each task in each node needs to copy a dataset, which wastes memory (that is, there may be multiple dataset copies in one node).
return input .mapPartition(new AllReduceSend (bufferName, lengthName, transferBufferName, sessionId)) .withBroadcastSet(input, “barrier”)
KMeansAssignCluster adds data to the context variable centroidAllReduce. So AllReduce is essentially waiting for this variable.
The first step in AllReduce is to take the shared variable from the context and send it. This part of the code is done by AllReduceSend.
For each task in AllReduceSend, bufferName is centroidAllReduce.
Because each AllReduceSend also corresponds to different tasks, the centroidAllReduce read by each AllReduceSend must be different, so the sendBuf obtained by each task is different. They take out the “centroidAllReduce” state corresponding to their <handle, taskId> and send it downstream.
When AllReduceSend is sent to its downstream, AllReduceSend is sent to each task based on the number of subtask, that is, the shared variable obtained in this task will be sent to each task, but the specific part of the variable is sent to which task. Is calculated based on the task index and data volume of that task. If the amount of data is small, it may only be sent to one or several tasks.
The subsequent taskId is a subtask ID.
How to calculate how much to send to which task is done in DefaultDistributedInfo. The analysis needs to be done in conjunction with the Pieces function. It should be noted that AllReduceSend is sent in this way, and AllReduceRecv is accepted in accordance with this routine. Thus AllReduceRecv can be merged.
Int pieces = pieces(sendLen); // Indicates that the send data is divided into several pieces, for example, 50 pieces. TRANSFER_BUFFER_SIZE // Will send 8 subtasks for (int I = 0; i < numOfSubTasks; Int startPos = (int) distributedInfo. StartPos (I, numOfSubTasks, pieces); How many piece / / send fifth subtask int CNT = (int) distributedInfo. LocalRowCnt (I, numOfSubTasks, pieces);Copy the code
The specific code is as follows:
private static int pieces(int len) { int div = len / TRANSFER_BUFFER_SIZE; // The size of each piece is TRANSFER_BUFFER_SIZE int mod = len % TRANSFER_BUFFER_SIZE; return mod == 0 ? div : div + 1; } public class DefaultDistributedInfo implements DistributedInfo { public long startPos(long taskId, long parallelism, long globalRowCnt) { long div = globalRowCnt / parallelism; long mod = globalRowCnt % parallelism; if (mod == 0) { return div * taskId; } else if (taskId >= mod) { return div * taskId + mod; } else { return div * taskId + taskId; } } public long localRowCnt(long taskId, long parallelism, long globalRowCnt) { long div = globalRowCnt / parallelism; long mod = globalRowCnt % parallelism; if (mod == 0) { return div; } else if (taskId >= mod) { return div; } else { return div + 1; }}}Copy the code
The specific AllReduceSend code is as follows, detailed in the annotations.
// Here is the variable name definition. public static final String CENTROID_ALL_REDUCE = "centroidAllReduce"; private static class AllReduceSend<T> extends RichMapPartitionFunction <T, Tuple3 <Integer, Integer, double[]>> { int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks(); // Related to parallelism, each task performs the same operation // BufferNames are centroidAllReduce, // Calculate how to send the required data structure int pieces = pieces(sendLen); DistributedInfo distributedInfo = new DefaultDistributedInfo(); Double [] sendBuf = context.getobj (bufferName); int agg = 0; // Send the required data to each task. Of course, this is determined by the size of the data to be sent. If the data volume is small, it may only be sent to one or several tasks. for (int i = 0; i < numOfSubTasks; ++ I) {// startPos: Specifies the part of the variable to send according to the task index. // CNT: How much data is sent to the downstream task I. If 0, no data is sent to task I. int startPos = (int) distributedInfo.startPos(i, numOfSubTasks, pieces); int cnt = (int) distributedInfo.localRowCnt(i, numOfSubTasks, pieces); for (int j = 0; j < cnt; Int bufStart = (startPos +j) * TRANSFER_BUFFER_SIZE; // the last if (startPos + j == pieces - 1) { System.arraycopy(sendBuf, bufStart, transBuf, 0, lastLen(sendLen)); } else { System.arraycopy(sendBuf, bufStart, transBuf, 0, TRANSFER_BUFFER_SIZE); } agg++; // I is the index of the subTasks, startPos + j is the position in the buffer, and the subsequent partitions are actually partitioned according to this I. This AllReduceSend is sent to the numOfSubTasks tasks. out.collect(Tuple3.of(i, startPos + j, transBuf)); } } } private static int pieces(int len) { int div = len / TRANSFER_BUFFER_SIZE; // 4096 int mod = len % TRANSFER_BUFFER_SIZE; return mod == 0 ? div : div + 1; } sendBuf = {double[15]@10602} 0 = 40.3 = 18.200000000000003 2 = 33.6 3 = 12.5 4 = 6.0 5 = 45.3 6 = 30.599999999999999999998 7 = 12.4 8 = 2.0 9 = 9.0 10 = 24.0 11 = 10.4 12 = 17.1 13 = 5.1999999999999 14 = 4.0 {AllReduce$AllReduceSend@10598} bufferName = "centroidAllReduce" lengthName = null transferBufferName = "3dfb2aae-683d-4497-91fc-30b8d6853bce" sessionId = 0 runtimeContext = {AbstractIterativeTask$IterativeRuntimeUdfContext@10606}Copy the code
3. AllReduceBroadcastRaw
AllReduceSend uses a custom partition (partitionCustom) when sending variables downstream. Index of subtask is used as the key partition. This corresponds to AllReduceSend out.collect.
.partitionCustom(new Partitioner <Integer>() { @Override public int partition(Integer key, int numPartitions) { return key; }}, 0). Name ("AllReduceBroadcastRaw") AllReduce$2 (com.alibaba.alink.common.comqueue.communication) partition:99, AllReduce$2 (com.alibaba.alink.common.comqueue.communication) customPartition:235, OutputEmitter (org.apache.flink.runtime.operators.shipping) selectChannel:149, OutputEmitter (org.apache.flink.runtime.operators.shipping) selectChannel:36, OutputEmitter (org.apache.flink.runtime.operators.shipping) emit:120, RecordWriter (org.apache.flink.runtime.io.network.api.writer) collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) mapPartition:257, AllReduce$AllReduceSend (com.alibaba.alink.common.comqueue.communication) run:103, MapPartitionDriver (org.apache.flink.runtime.operators) run:504, BatchTask (org.apache.flink.runtime.operators) run:157, AbstractIterativeTask (org.apache.flink.runtime.iterative.task) run:107, IterationIntermediateTask (org.apache.flink.runtime.iterative.task) invoke:369, BatchTask (org.apache.flink.runtime.operators) doRun:705, Task (org.apache.flink.runtime.taskmanager) run:530, Task (org.apache.flink.runtime.taskmanager) run:745, Thread (Java. Lang) / / @ AllReduceSend mapPartition here calls for (int I = 0; i < numOfSubTasks; ++ I) {// I is the index of subTasks, and the subsequent partitions are actually partitioned according to this index. This AllReduceSend is sent to the numOfSubTasks tasks. out.collect(Tuple3.of(i, startPos + j, transBuf)); } // As can be seen from the sequence of subsequent calls, index of subtask is used as the key partition. Public class implements Collector<OUT> {public void collect(OUT record) {public void collect(OUT record) { this.numRecordsOut.inc(); this.collector.collect(record); }} record = {Tuple3@10586} "(0,0,[40.50000000000001, 18.7, 33.300000000000004, 12.8, 6.0, 29.7, 21.0, 8.4, 1.7, 6.0, 48.1, 22.199999999999996, 36.0, 12.200000000000001, 8.0, 0.0," F0 = {Integer@10583} 0 f1 = {Integer@10583} 0 f2 = {double[4096]@10598} implements ChannelSelector<SerializationDelegate<T>> { private int customPartition(T record, int numberOfChannels) { if (extractedKeys == null) { extractedKeys = new Object[1]; } if (record, extractedKeys, 0) == 1) {// Final Object key = extractedKeys[0]; return partitioner.partition(key, numberOfChannels); } } } public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<T> { public int extractKeys(Object record, Object[] target, int index) { int localIndex = index; for(int i = 0; i < comparators.length; i++) { localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex); } return localIndex - index; Key = {Integer@10583} 0 value = 0 extractedKeys = {Object[1]@10587} 0 = {Integer@10583} 0 value = 0Copy the code
4. AllReduceSum
All workers do partial Reduce on the data they receive, and then send the partial reduce results to other workers.
Partial results indicate that each task receives different data. The upstream calculates the location based on the Task index and sends the data.
However, the calculation results of AllReduceSum will be sent to each downstream Task index.
private static class AllReduceSum extends RichMapPartitionFunction <Tuple3 <Integer, Integer, double[]>, Tuple3 <Integer, Integer, double[]>> { public void mapPartition(Iterable <Tuple3 <Integer, Integer, Double []>> values,Collector <Tuple3 <Integer, Integer, double[]>> out) { int taskId = getRuntimeContext().getIndexOfThisSubtask(); int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks(); double[] sendBuf = context.getObj(bufferName); int sendLen = lengthName ! = null ? context.getObj(lengthName) : sendBuf.length; int pieces = pieces(sendLen); DistributedInfo distributedInfo = new DefaultDistributedInfo(); // startPos: the data received by this task. StartPos is the location where the original data should be started. Is determined by task Index. // CNT: Which downstream task I sends how much data is determined. int startPos = (int) distributedInfo.startPos(taskId, numOfSubTasks, pieces); int cnt = (int) distributedInfo.localRowCnt(taskId, numOfSubTasks, pieces); Double [][] SUM = new double[CNT][]; double[] agg = new double[cnt]; do { Tuple3 <Integer, Integer, double[]> val = it.next(); int localPos = val.f1 - startPos; if (sum[localPos] == null) { sum[localPos] = val.f2; agg[localPos]++; } else { op.accept(sum[localPos], val.f2); } } while (it.hasNext()); // Subtask index is still used as the partition key. // Note that the result is sent to all downstream tasks. for (int i = 0; i < numOfSubTasks; ++i) { for (int j = 0; j < cnt; ++j) {// startPos is where the data sent by this task should start from. // But the same data is sent to each task I. But startPos + j is important enough that downstream Task I knows where it should store the received data in a predefined variable. out.collect(Tuple3.of(i, startPos + j, sum[j])); }}}} sum = {double[1][]@10605} 0 = {double[4096]@10613} 0 = 118.50000000000001 1 = 77.7 2 = 37.2 3 = 5.9 4 = 25.0 5 = 621.1000000000001 6 = 284.7 7 = 487.59999999999997 8 = 166.5 9 = 99.0 10 = 136.9 11 = 95.7 12 = 39.0 13 = 7.4 14 = 26.0Copy the code
5. AllReduceBroadcastSum
AllReduceSum uses a custom partition (partitionCustom) when sending variables downstream. Index of subtask is used as the key partition.
This has the same meaning as the previous partitionCustom.
6. AllReduceRecv
All workers merge partial results into final result and put it into session context with pre-defined object name.
Each downstream AllReduceRecv receives the cluster (after sum) sent by each upstream AllReduceSum, Each data is then stored to a different part of the predefined variable state corresponding to its Task Manager (the different part is calculated from the received data val.f1).
Combined with the above, it can be seen that AllReduceSend sending and AllReduceRecv receiving are calculated according to the same routine in the data position in the shared variable. Thus AllReduceRecv can be merged.
In this way, all workers merge the results of partial reduce sum into the final result, and then put it into the predefined context variables.
private static class AllReduceRecv<T> extends RichMapPartitionFunction <Tuple3 <Integer, Integer, double[]>, T> { private final String bufferName; private final String lengthName; private final int sessionId; @Override public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values, Collector <T> out) throws Exception { ComContext context = new ComContext(sessionId, getIterationRuntimeContext()); Iterator <Tuple3 <Integer, Integer, double[]>> it = values.iterator(); if (! it.hasNext()) { return; } double[] recvBuf = context.getObj(bufferName); int recvLen = lengthName ! = null ? context.getObj(lengthName) : recvBuf.length; int pieces = pieces(recvLen); // The same routine as AllReduceSend calculates where shared variables should be stored. do { Tuple3 <Integer, Integer, double[]> val = it.next(); if (val.f1 == pieces - 1) { System.arraycopy(val.f2, 0, recvBuf, val.f1 * TRANSFER_BUFFER_SIZE, lastLen(recvLen)); } else {// Copy to the corresponding part of the shared variable. Val.f1 is sent upstream. As the starting point for the Merge function. System.arraycopy(val.f2, 0, recvBuf, val.f1 * TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE); } } while (it.hasNext()); }} val = {Tuple3@10672} "(3,0,[335.3, 150.8999999999999998, 277.5, 99.799999999998, 50.0, 290.9, 136.3, 213.1, 67.8, 50.0, 250.3, 170.89999999999998, 73.2, 12.2, 50.0, 0.0....." f0 = {Integer@10682} 3 value = 3 f1 = {Integer@10638} 0 value = 0 f2 = {double[4096]@10674} 0 = 335.3 1 = 150.89999999999998 2 = 277.5 3 = 99.799999999998 4 = 50.0 5 = 290.9 6 = 136.3 7 = 213.1 8 = 67.8 9 = 50.0 10 = 250.3 11 = 170.89999999999998 12 = 73.2 13 = 12.2 14 = 50.0 15 = 0.0...... // Each task received the reduce sum result. RecvBuf = {double[15]@10666} 0 = 404.3 1 = 183.1 2 = 329.3 3 = 117.2 4 = 61.0 5 = 250.3 6 = 170.8999999999999998 7 = 73.20000000000002 8 = 12.9 = 50.0 10 = 221.89999999999998 11 = 104.1 12 = 161.29999999999998 13 = 50.4 14 = 39.0Copy the code
7. KMeansUpdateCentroids
Based on point count and coordinates, the new clustering center is calculated. Here is the shared variable CENTROID_ALL_REDUCE stored by AllReduce taken from Task Manager.
/** * Update the centroids based on the sum of points and point number belonging to the same cluster. */ public class KMeansUpdateCentroids extends ComputeFunction { public void calc(ComContext context) { Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE); Integer k = context.getObj(KMeansTrainBatchOp.K); / / remove AllReduce storage Shared variables here double [] sumMatrixData = context. GetObj (KMeansTrainBatchOp. CENTROID_ALL_REDUCE); Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids; if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } stepNumCentroids.f0 = context.getStepNo(); context.putObj(KMeansTrainBatchOp.K, updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance)); }}Copy the code
0 XFF reference
My way of parallel computing (4) Reduce and Allreduce for MPI set communication
Message Passing Interface(MPI)
Flink Dataflow, Task, subTask, Operator Chains, Slot introduction
TaskManager of the Flink runtime executes tasks
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.