The information was wrong before, so it was divided into two articles. Now it is published in full.
0 x00 the
Alink is a new-generation machine learning algorithm platform developed by Alibaba based on real-time computing engine Flink. It is the first machine learning platform in the industry to support both batch algorithm and streaming algorithm. Iterative algorithms are used in many areas of data analysis, such as machine learning or graph computing. This article will start with Superstep to see how Alink uses the Flink iteration API to implement specific algorithms.
Since there is too little public information about Alink, the following is all personal speculation. I hope you can point out some mistakes. I will keep you updated.
0 x01 reason
The reason why the concept of Superstep is mentioned is that several strange places are found when meking means code. For example, in the following three steps, context.getStepNo() is used, and different business operations will be performed according to its value:
public class KMeansPreallocateCentroid extends ComputeFunction {
public void calc(ComContext context) {
LOG.info("liuhao KMeansPreallocateCentroid ");
if (context.getStepNo() == 1) {
/** Allocate memory for pre-round centers and current centers. */}}}public class KMeansAssignCluster extends ComputeFunction {
public void calc(ComContext context) {...if (context.getStepNo() % 2= =0) {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
} else {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
}
/** specific business logic code * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. */}}public class KMeansUpdateCentroids extends ComputeFunction {
public void calc(ComContext context) {
if (context.getStepNo() % 2= =0) {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
} else {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
}
* Update the centroids based on the sum of points and point number belonging to the same cluster. */
}
Copy the code
Check ComContext source, found stepNo sources was runtimeContext. GetSuperstepNumber ().
public class ComContext {
private final int taskId;
private final int numTask;
private final int stepNo; // Yes, it is
private final int sessionId;
public ComContext(int sessionId, IterationRuntimeContext runtimeContext) {
this.sessionId = sessionId;
this.numTask = runtimeContext.getNumberOfParallelSubtasks();
this.taskId = runtimeContext.getIndexOfThisSubtask();
this.stepNo = runtimeContext.getSuperstepNumber(); // The variable is initialized
}
/**
* Get current iteration step number, the same as {@link IterationRuntimeContext#getSuperstepNumber()}.
* @return iteration step number.
*/
public int getStepNo(a) {
return stepNo; // Here is the use}}Copy the code
You may be shocked to see some of the brothers here, this is not the concept of BSP model. I just want to write a KMeans algorithm, why in addition to MPI model, but also consider the BSP model. Here’s a look at what Alink does step by step.
0x02 Background concepts
2.1 Four-layer execution diagram
The execution diagram in Flink can be divided into four layers: StreamGraph -> JobGraph -> ExecutionGraph -> physical execution diagram
- StreamGraph: The original graph generated by code written by the Stream API. Used to represent the topology of a program.
- JobGraph: StreamGraph is optimized to generate the JobGraph, which is the data structure submitted to the JobManager. The main optimization is to chain multiple qualified nodes together as a node, which can reduce the serialization/deserialization/transmission consumption required by data flowing between nodes. The JobGraph is the only data structure that represents jobs that Flink’s data flow engine recognizes, and it is this common abstraction that represents the unity of stream and batch processing at run time.
- ExecutionGraph: The JobManager generates an ExecutionGraph from the JobGraph. ExecutionGraph, a parallel version of JobGraph, is the core data structure of the scheduling layer.
- Physical ExecutionGraph: JobManager schedules jobs according to the ExecutionGraph, and the “graph” formed by deploying tasks on each TaskManager is not a specific data structure.
2.2 the Task and SubTask
For some reason, the use of these two concepts within Flink itself is confusing: in Task Manager, the concept of a subtask is implemented by a class called Task. Task Manager talks about a Task object that actually corresponds to a subtask in the ExecutionGraph.
So these two concepts need to be clarified.
- Task: Task corresponds to a node of the JobGraph and is an Operator. A Task is a collection of multiple subtasks with the same function in a phase, similar to the TaskSet in Spark.
- SubTask: A subTask is the smallest unit of task execution in Flink. It is an instance of a Java class that has properties and methods to perform the specific computation logic. In the ExecutionGraph, tasks are decomposed into multiple subtasks that execute in parallel. Each subtask is assigned to the Task Manager as an excution for execution.
- Operator Chains: Multiple operators without shuffle are merged in a single subTask to form Operator Chains, similar to the Pipeline in Spark. The number of Operator subtasks refers to the degree of parallelism of the Operator. Different operators of the same program may also have different degrees of parallelism (because you can modify the degree of parallelism with the setParallelism() method).
The programs in Flink are essentially parallel. Each Operator (Transformation) has one or more Operator subtasks (Operator subTask) during execution. Each Operator subTask is independent of each other and executed in a different thread. And may be executed on different machines or containers.
Task (SubTask) is a Runnable object that the Task Manager instantiates into a Task object after receiving TDD and starts a thread to execute the Run method of the Task.
TaskDeploymentDescriptor(TDD) : Is the Task Manager whose submitTask is the data structure submitted to the TM. It contains all description information about the Task. Such as:
- TaskInfo : A Java class that contains the execution of the Task, which is an implementation class of an AbstractInvokable, Of course, it is an implementation class of an operator (such as DataSourceTask, DataSinkTask, BatchTask,StreamTask, etc.).
- IG description: usually contain one or two InputGateDeploymentDescriptor (IGD).
- Description of the target RP: ParitionId, PartitionType, number of RS, etc.
2.3 How to Divide Tasks
Tasks are reassigned in the following situations
- When the degree of parallelism changes
- KeyBy ()/ window()/apply() makes a Rebalance.
- Call the startNewChain() method to start a new chain of operators;
- Call diableChaining(), which tells the current operator operation not to use operator chain operations.
For example, the following operations are performed
DataStream<String> text = env.socketTextStream(hostname, port);
DataStream counts = text
.filter(new FilterClass())
.map(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(10))
.sum(2)
Copy the code
So the transformation stream of the StreamGraph is:
Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink
Copy the code
Its task is four:
- Source –> Filter –> Map
- keyBy
- timeWindow
- Sink
Each task is divided into several subtasks. During execution, a Task is executed in parallel into several subTask instances, and each subTask corresponds to one thread of execution.
2.4 JobGraph
All of this is to talk about jobGraph and Subtask, because in this article we will start with the jobGraph when analyzing source code and debugging subtask.
JobGraph is on the basis of StreamGraph, the StreamNode associative merge operation, such as source -> flatMap -> reduce -> sink such a data processing chain, When source and flatMap meet the connection conditions, the operations of the two operators can be executed in a parallel thread, which reduces data transmission in the network. Since the data transmitted between source and flatMap does not need to be serialized or deserialized, the execution efficiency of the program is also improved.
Compared to the StreamGraph and the OptimizedPlan, the JobGraph is not a completely “static” data structure because of the addition of the “dynamic” concept of an intermediate result set.
JobVertex, IntermediateDataSet, and JobEdge are the basic elements of the JobGraph. These three objects depend on each other:
- A JobVertex associates several JobEdge as input and several intermediateDatasets as the result set produced by it; Each JobVertex has attributes such as degree of parallelism and executing code.
- An IntermediateDataSet associates a JobVertex as the producer and several JobEdge as consumers;
- A JobEdge is associated with an IntermediateDataSet which can be considered the source and a JobVertex which can be considered the target consumer;
So how does the JobGraph organize and store these elements? The JobGraph stores all JobVertex entries as a Map, with the key being JobVertexID:
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
The rest of the elements can be found by relationship using JobVertex. Note that the feedback edge for iteration is not currently reflected in the JobGraph, but is embedded in a special JobVertex to establish a relationship between them through the feedback channel.
2.5 BSP model and Superstep
BSP model
BSP model is a kind of parallel computing model. Parallel computing model usually refers to the design and analysis of parallel algorithms, the basic characteristics of various parallel computers (at least a class of parallel computers) are abstracted to form an abstract computing model.
The BSP model is a asynchronous MIMD-DM model (DM: distributed memory, SM: Shared memory), THE BSP model supports the messaging system, with asynchronous parallelism within blocks and explicit synchronization between blocks. This model is based on a master coordination, all workers are executed synchronously (lock-step), and data is read from the input queue.
The BSP computing model is not only an architectural model, but also a method for designing parallel programs. The BSP programming principle is Bulk synchrony, which is unique in the introduction of the concept of superstep. A BSP program has both a horizontal and a vertical structure. Vertically, a BSP program consists of a series of serial supersteps.
Implementation of BSP model
The BSP model is roughly implemented as follows:
- Pregel: Google’s large-scale graph computing framework, which first proposed the application of BSP model to graph computing, see Pregel – Large-scale Graph Processing System, which is not open source yet.
- Apache Giraph: Incubator project for the ASF community, funded by Yahoo! Contribution is a Java implementation of BSP. It focuses on iterative graph calculation (such as Pagerank, shortest connection, etc.). Each job is a Hadoop job without reducer.
- Apache Hama: Also the ASF community’s Incubator project, unlike Giraph, this is a pure Java implementation of the BSP model, and not just for graph calculation, but intended to provide a general purpose application framework for the BSP model.
Flink-Gelly
Flink-gelly uses Flink’s efficient iterative operator to support iterative graph processing of massive data. Currently, Flink Gelly provides implementations of “Vertex-Centric”, “Scatter-Gather” and “Gather- sum-apply” computing models.
The “Vertex-Centric” iterative model, also known as “Pregel,” is a Vertex approach to graph computing. Among them, the step of synchronous iterative computation is called “superstep”. In each “superstep”, each vertex performs a user-defined function, and the vertices communicate with each other through messages. When a vertex knows the unique ID of any other vertex in the graph, it can send a message to it.
But actually, KMeans is not graph processing and Alink is not built on Flink-Gelly. Maybe he just borrowed the concept. So we need to find out more.
0x03 Flink’s Iterative Algorithm (Superstep-based)
Iterative algorithms are used in many areas of data analysis, such as machine learning or graph computing. In order to extract useful information from big data, iterative calculation is often needed in the process of processing.
The so-called iterative operation is to give an initial value, use the given algorithm formula to calculate the initial value to get an intermediate result, and then take the intermediate result as an input parameter for repeated calculation, and get the calculation result when certain conditions are met.
There are many big data processing frameworks, such as Spark, Mr. In fact, it is very difficult to implement iterative computation.
Flink directly supports iterative computation. Flink’s idea of implementing iteration is also very simple. It is to implement a step function and then embed it in the iteration operator. There are two kinds of iterative operators: Iterate and Delta Iterate. Both operators call the step function until they receive a signal to terminate the iteration.
3.1 Bulk Iterate
This kind of iteration is called full iteration, and it will input the whole data, and after a certain number of iterations, you will get the result you want.
The iteration operator includes a simple iteration form: in each iteration, the step function consumes the full amount of data (the input of this iteration and the result of the last iteration), and then calculates the output of the next iteration (for example, Map, Reduce, join, etc.)
The iterative process is mainly divided into the following steps:
- Iteration Input: Is the initial Input value or the result of the last Iteration calculation.
- Step Function: The Step Function is executed on each iteration. It iteratively computes the DataSet, which is composed of a series of operators, such as map, flatMap, join, etc., depending on the specific business logic.
- Next Partial Solution: The result of each iteration, which is sent to the Next iteration.
- Iteration Result: The Result of the last Iteration, which is output to datasink or sent downstream for processing.
The end condition of its iteration is:
- The maximum number of iterations is reached
- Custom convergent aggregation function
You need to call iterate(int), which returns an IterativeDataSet. You can perform operations on it, such as map. The only argument to the Iterate function is the maximum number of iterations.
Iteration is a ring. The closeWith(Dataset) operation is required, and the parameter is the Dataset to be iterated over. Optionally, a termination criterion can be specified. The operation closeWith(DataSet, DataSet) can terminate the iteration by determining whether the second DataSet is null. If no terminating iteration condition is specified, the iteration will terminate after the maximum number of iterations has been iterated.
3.2 Iteration Mechanism
The DataSet API introduces a unique synchronous iteration mechanism (superstep-based), which is limited to bounded streams.
We refer to the execution of each step function of the iteration operator as a single iteration. In a parallel setup, multiple instances of the step function are computed in parallel on different partitions of the iteration state. In many Settings, a single evaluation of the step function on all parallel instances forms what is called a superstep, which is also the granularity of synchronization. Therefore, all concurrent tasks of the iteration need to complete the superstep before initializing the next superstep. Termination criteria will also be evaluated as superstep synchronization barriers.
Below is the original Apache text
We referred to each execution of the step function of an iteration operator as a single iteration. In parallel setups, multiple instances of the step function are evaluated in parallel on different partitions of the iteration state. In many settings, one evaluation of the step function on all parallel instances forms a so called superstep, which is also the granularity of synchronization. Therefore, all parallel tasks of an iteration need to complete the superstep, before a next superstep will be initialized. Termination criteria will also be evaluated at superstep barriers.
Below is the original apache image
Summarized as follows:
Each iteration is a superstep and each iteration has several subtasks executing steps on different partitions and each step has a HeadTask, several intermediateTasks, A TailTask has a SynchronizationSinkTask synchronization per superstep, because all parallel tasks of an iteration need to be completed before the next iterationCopy the code
Superstep is a concept of the Flink DataSet API, but you can see the shadow of the BSP model here. For example:
- In the traditional BSP model, a superstep is divided into three steps: local computation, message delivery, and synchronous barrier.
- Barrier Synchronization is also called Barrier Synchronization. Each synchronization is also the completion of one superstep and the beginning of the next;
- Superstep a Superstep is a computational iteration, one Superstep for each layer of advance from the beginning.
- Does the program control when it ends
0x04 How does Alink Use iteration
KMeansTrainBatchOp. IterateICQ function, generates a IterativeComQueue, and use of IterativeComQueue superstep – -based iteration.
return new IterativeComQueue()
.initWithPartitionedData(TRAIN_DATA, data)
.initWithBroadcastData(INIT_CENTROID, initCentroid)
.initWithBroadcastData(KMEANS_STATISTICS, statistics)
.add(new KMeansPreallocateCentroid())
.add(new KMeansAssignCluster(distance))
.add(new AllReduce(CENTROID_ALL_REDUCE))
.add(new KMeansUpdateCentroids(distance))
.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) // Termination conditions
.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
.setMaxIter(maxIter) // Maximum number of iterations
.exec();
Copy the code
Basecomqueue.exec has:
public DataSet<Row> exec(a) {
IterativeDataSet<byte[]> loop // Flink iteration API
= loopStartDataSet(executionEnvironment)
.iterate(maxIter);
/ / subsequent operations, before adding KMeansPreallocateCentroid, for example, in the queue is running above the loop.
if (null == compareCriterion) {
loopEnd = loop.closeWith...
} else {
// compare Criterion.DataSet<Boolean> criterion = input ... compareCriterion loopEnd = loop.closeWith( ... criterion ... ) }}Copy the code
Looking at the code more closely, we can see:
Superstep include:
.add(new KMeansPreallocateCentroid()) .add(new KMeansAssignCluster(distance)) .add(new AllReduce(CENTROID_ALL_REDUCE)) .add(new KMeansUpdateCentroids(distance))
The termination criterion is
A RichMapPartitionFunction is built with KMeansIterTermination as the termination criterion. Finally, KMeansOutputModel is invoked to complete the business operation.
The maximum cycle is
.setMaxIter(maxIter)
Therefore, we can conclude that the Superstep-based Bulk Iterate operator is used to implement the overall KMeans algorithm, which is a superstep iteration. However, if communication or fence synchronization is required in the Superstep content, MPI’s allReduce is used.
0x05 Delve into the Flink source code and Runtime to verify
We need to dig deep inside Flink to verify, if you are interested, you can see below call stack, add your own breakpoints to study.
execute:56, LocalExecutor (org.apache.flink.client.deployment.executors)
executeAsync:944, ExecutionEnvironment (org.apache.flink.api.java)
execute:860, ExecutionEnvironment (org.apache.flink.api.java)
execute:844, ExecutionEnvironment (org.apache.flink.api.java)
collect:413, DataSet (org.apache.flink.api.java)
sinkFrom:44, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
sinkFrom:20, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
link:89, BatchOperator (com.alibaba.alink.operator.batch)
linkTo:239, BatchOperator (com.alibaba.alink.operator.batch)
print:337, BatchOperator (com.alibaba.alink.operator.batch)
main:35, KMeansExample (com.alibaba.alink)
Copy the code
5.1 Submit Job to Flink
Alink and Flink build the connection, which is done in the print call. Because this is local debugging, Flink will start a miniCluster and then do the following.
- The Plan is generated first. The Plan represents the batch program in the form of a data stream, but it is only the initial representation of the batch program, and the Plan is then optimized to produce a more efficient Plan, the OptimizedPlan.
- The plan is then compiled to generate the JobGraph. This diagram is the one that flink will give to generate the task.
- Generate a series of configurations.
- Leave the JobGraph and configuration to the Flink cluster to run. If it is not running locally, it will also send the JAR files across the network to other nodes.
- Running in local mode, you can see the startup process, such as starting performance metrics, Web modules, JobManager, ResourceManager, taskManager, and so on.
When we see the submitJob call, we know that the KMeans code has built a connection with Flink.
@Internal
public class LocalExecutor implements PipelineExecutor {
public static final String NAME = "local";
@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
final JobGraph jobGraph = getJobGraph(pipeline, configuration);
final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);
final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);
CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);
jobIdFuture
.thenCompose(clusterClient::requestJobResult)
.thenAccept((jobResult) -> clusterClient.shutDownCluster());
return jobIdFuture.thenApply(jobID ->
new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
}
Copy the code
5.2 generate JobGraph
The process of generating jobGraph is as follows:
- IterativeDataSet. CloseWith generates a BulkIterationResultSet.
- PrintBatchOp. Calls to ExecutionEnvironment in sinkFrom. ExecuteAsync
- Call createProgramPlan to build a Plan
- OperatorTranslation. Translate function
if (dataSet instanceof BulkIterationResultSet)
The calltranslateBulkIteration(bulkIterationResultSet);
- The Plan is generated
- ExecutionEnvironment. Call executeAsync LocalExecutor. Execute
- Then call FlinkPipelineTranslationUtil. GetJobGraph to generate jobGraph
- The meeting in GraphCreatingVisitor. PreVisit judgment
if (c instanceof BulkIterationBase)
To generate BulkIterationNode - PlanTranslator translateToJobGraph will call to the JobGraphGenerator.com pileJobGraph, eventually call to createBulkIterationHead generates the Head of iterative process.
- Finally, the jobGraph is submitted to the Cluster, and the jobGraph is transformed into ExceutionGraph for execution on JM and TM.
5.3 Iterating tasks
In the previous code, the function getJobGraph generates the job graph.
The JobManager then generates an ExecutionGraph from the JobGraph. ExecutionGraph, a parallel version of JobGraph, is the core data structure of the scheduling layer.
Finally, after the JobManager schedules the jobs according to the ExecutionGraph, the tasks are deployed on each TaskManager.
So we need to see what tasks the iteration API corresponds to at the final run time.
For the IterativeDataSet, which is the Superstep-based Bulk Iterate, Flink generates the following task.
- IterationHeadTask
- IterationIntermediateTask
- IterationTailTask
- IterationSynchronizationSinkTask
5.3.1 IterationHeadTask
The main role of IterationHeadTask is to coordinate an iteration.
It reads the initial input and iterates over the Tail to create a BlockingBackChannel. After successfully processing the input, it sends the EndOfSuperstep event to its output. It will contact the Synchronization Task after each superstep and wait until it receives an AllWorkersDoneEvent for synchronization. AllWorkersDoneEvent indicates that all other heads have completed their iteration.
In the next iteration, the output of tail in the previous iteration is transmitted through the backchannel, forming the input of head. When to move on to the next iteration is done by the HeadTask. Once the iteration is complete, the head will send a TerminationEvent to all associated tasks, informing them of shutdown.
barrier.waitForOtherWorkers();
if (barrier.terminationSignaled()) {
requestTermination();
nextStepKickoff.signalTermination();
} else {
incrementIterationCounter();
String[] globalAggregateNames = barrier.getAggregatorNames();
Value[] globalAggregates = barrier.getAggregates();
aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
// Initiate the next Superstep here.nextStepKickoff.triggerNextSuperstep(); }}Copy the code
Build IterationHeadTask is in JobGraphGenerator createBulkIterationHead. Here’s an example:
"PartialSolution (Bulk Iteration) (org.apache.flink.runtime.iterative.task.IterationHeadTask)"
Copy the code
5.3.2 IterationIntermediateTask
IterationIntermediateTask is superstep middle period of the task, it will transfer EndOfSuperstepEvent and TerminationEvent to all of the tasks associated with it. In addition, IterationIntermediateTask can update the workset or the solution set of the iteration.
If the iteration state is updated, the output of this task is passed back to IterationHeadTask, in which case the task is scheduled again as head.
IterationIntermediateTask example is as follows:
"MapPartition (computation@KMeansUpdateCentroids) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
"Combine (SUM(0), at kMeansPlusPlusInit(KMeansInitCentroids.java:135) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
"MapPartition (AllReduceSend) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
"Filter (Filter at kMeansPlusPlusInit(KMeansInitCentroids.java:130)) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
Copy the code
5.3.3 IterationTailTask
IterationTailTask is the end of the iteration. If the iteration state is updated, the output of this task is sent back to IterationHeadTask via BlockingBackChannel. Feedback to the iteration head indicates that the full logic of the iteration has been completed and the iteration closure loop can be closed. In this case, the task will be rescheduled on the instance where the head resides.
Here are a few key points to note:
How to connect with Head
Flink has a class, BlockingQueueBroker, which is a BlockingQueueBroker that controls iterative concurrency. Brokers are singleton, and the iterated header and tail tasks generate the same Broker ID, so the header and tail communicate in the same JVM based on the same dataChannel. The dataChannel is created by the iteration header.
IterationHeadTask generates BlockingBackChannel, which is a blocking queue of capacity 1.
/ / generated channel
BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, this.getIOManager()));
// Then the block is here, waiting for Tail
superstepResult = backChannel.getReadEndAfterSuperstepEnded();
Copy the code
IterationTailTask is as follows:
// Get the channel in the base class, because it is a singleton, so it will get the same
worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey());
// Notify Iteration head if responsible for workset update
worksetBackChannel.notifyOfEndOfSuperstep();
Copy the code
The two are linked using the same brokerKey in the same subtask, so that the end and end are linked.
public String brokerKey(a) {
if (this.brokerKey == null) {
int iterationId = this.config.getIterationId();
this.brokerKey = this.getEnvironment().getJobID().toString() + The '#' + iterationId + The '#' + this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
}
return this.brokerKey;
}
Copy the code
How to pass the value returned by the user to the Head
This is done through output.collect.
First, when Tail is initialized, an outputCollector is generated that is set to be the output outputCollector of this task. This ensures that the output of the user function is forwarded to the outputCollector.
The output of the outputCollector is the output of the worksetBackChannel, which is set to the same instance. The user output is then output to the backChannel.
@Override
protected void initialize(a) throws Exception {
super.initialize();
// set the last output collector of this task to reflect the iteration tail state update:
// a) workset update,
// b) solution set update, or
// c) merged workset and solution set update
Collector<OT> outputCollector = null;
if (isWorksetUpdate) {
// Generate an outputCollector
outputCollector = createWorksetUpdateOutputCollector();
// we need the WorksetUpdateOutputCollector separately to count the collected elements
if(isWorksetIteration) { worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector; }}...// Set outputCollector to the output of this task
setLastOutputCollector(outputCollector);
}
Copy the code
The output of the outputCollector is the output buffer of the worksetBackChannel, which is set to the same instance.
protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) {
DataOutputView outputView = worksetBackChannel.getWriteEnd();
TypeSerializer<OT> serializer = getOutputSerializer();
return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate);
}
Copy the code
Runtime is as follows:
@Override
public void run(a) throws Exception {
SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
while (this.running && ! terminationRequested()) {// The user prints here, which will be output to output.collect, which is the output buffer of the worksetBackChannel.
super.run();
// When the output to the channel is finished, the head is only notified to read.
if (isWorksetUpdate) {
// notify iteration head if responsible for workset update
worksetBackChannel.notifyOfEndOfSuperstep();
} else if (isSolutionSetUpdate) {
// notify iteration head if responsible for solution set updatesolutionSetUpdateBarrier.notifySolutionSetUpdate(); }... }Copy the code
The IterationTailTask example is as follows:
"Pipe (org.apache.flink.runtime.iterative.task.IterationTailTask)"
Copy the code
5.3.4 IterationSynchronizationSinkTask
IterationSynchronizationSinkTask role is all synchronous iteration heads, IterationSynchronizationSinkTask is being implemented as an output of task. It’s just for coordination, it doesn’t process any data.
In every superstep IterationSynchronizationSinkTask just wait until it received a WorkerDoneEvent from each head. This means that the next superstep is ready to begin.
The important thing to note here is how SynchronizationSinkTask waits for the headTask of each degree of parallelism. For example, if the parallelism of Flink is 5, how can the SynchronizationSinkTask wait for these 5 headtasks?
Registered in IterationSynchronizationSinkTask WorkerDoneEvent SyncEventHandler to wait for the head.
this.eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, this.aggregators, this.getEnvironment().getUserClassLoader());
this.headEventReader.registerTaskEventListener(this.eventHandler, WorkerDoneEvent.class);
Copy the code
In SyncEventHandler, we can see, in the build, numberOfEventsUntilEndOfSuperstep was set to parallelism, received a WorkerDoneEvent every time, WorkerDoneEventCounter is increasing, when equal numberOfEventsUntilEndOfSuperstep, namely parallelism, means that this superstep, all headtask succeeded.
private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) {
if (this.endOfSuperstep) {
throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
} else {
// Increments each time
++this.workerDoneEventCounter;
String[] aggNames = workerDoneEvent.getAggregatorNames();
Value[] aggregates = workerDoneEvent.getAggregates(this.userCodeClassLoader);
if(aggNames.length ! = aggregates.length) {throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
} else {
for(int i = 0; i < aggNames.length; ++i) {
Aggregator<Value> aggregator = (Aggregator)this.aggregators.get(aggNames[i]);
aggregator.aggregate(aggregates[i]);
}
/ / numberOfEventsUntilEndOfSuperstep is parallelism, equal to parallelism shows all the head when I was a success.
if (this.workerDoneEventCounter % this.numberOfEventsUntilEndOfSuperstep == 0) {
this.endOfSuperstep = true; Thread.currentThread().interrupt(); }}}}Copy the code
IterationSynchronizationSinkTask example is as follows:
"Sync (BulkIteration (Bulk Iteration)) (org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask)"
Copy the code
5.4 superstep
To sum up, we finally get the superstep as follows:
***** The text description is as follows ***** Each iteration is a superstep. Each iteration has several subtasks executing steps on different partitions. Each step has a HeadTask, several intermediateTasks, A TailTask has a SynchronizationSinkTask per superstep ***** pseudocode roughly as follows *****forMaxIter: begin superstepforMaxSubTask: begin step IterationHeadTask IterationIntermediateTask IterationIntermediateTask... IterationIntermediateTask IterationIntermediateTask IterationTailTask end step IterationSynchronizationSinkTask end superstepCopy the code
0x06 Combine the KMeans code to see superset
6.1 Summary of K-means algorithm
The process of K-means algorithm, in order to try not to use mathematical symbols, is not very rigorous, which is roughly the meaning of “birds of a feather flock together” :
- First input the value of k, that is, we want to get K groups by clustering the data set.
- Randomly select k data points from the data set as the initial brother (Centroid).
- For each of the younger brothers in the set, calculate the distance to each of the older brothers (the meaning of distance will be discussed later). Whichever brother is closer to the younger brother, follow the younger brother.
- At this time, each brother has gathered a vote of younger brothers, and the people’s Congress is held, and each group chooses a new brother (in fact, a new center of mass is chosen by algorithm).
- If the distance between new Big Brother and Big Brother is less than a certain set threshold (indicating that the position of the recalculated center of mass does not change much and tends to be stable, or convergence), it can be considered that our clustering has reached the desired result and the algorithm is terminated.
- If the distance between the new Big Brother and The Big Brother changes significantly, iterate 3 to 5 steps.
6.2 KMeansPreallocateCentroid
KMeansPreallocateCentroid is also a member superstep, but only the context getStepNo () = = 1, will enter the actual business logic, pre-allocated Centroid. When superstep is greater than 1, this task will be executed, but will not enter the specific business code.
public class KMeansPreallocateCentroid extends ComputeFunction {
private static final Logger LOG = LoggerFactory.getLogger(KMeansPreallocateCentroid.class);
@Override
public void calc(ComContext context) {
// Each superstep will enter here
LOG.info(" KMeansPreallocateCentroid 我每次都会进的呀 ");
if (context.getStepNo() == 1) {
// The actual pre-allocated service is entered only once}}}Copy the code
6.3 KMeansAssignCluster and KMeansUpdateCentroids
The function of KMeansAssignCluster is to calculate the nearest cluster center for each point, and to count and sum the point coordinates of each cluster center.
The KMeansUpdateCentroids action is to calculate new cluster centers based on the calculated point counts and coordinates.
Alink maintains a special node throughout the computation to remember the current results of the desired center point.
This is why it is necessary to distinguish between odd and even iterations. An odd number indicates Big Brother, and an even number indicates new Brother. Each superstep only counts one batch of big brothers, leaving another batch for distance comparison.
In addition, it should be noted that in ordinary iterative calculation, user data is sent back to Head through Tail, but this method is not adopted in the implementation of KMeans. Instead, the computed center points are stored in shared variables and interinteract with each other among intermediate points.
public class KMeansAssignCluster extends ComputeFunction {
public void calc(ComContext context) {...if (context.getStepNo() % 2= =0) {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
} else {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
}
/** specific business logic code * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. */}}public class KMeansUpdateCentroids extends ComputeFunction {
public void calc(ComContext context) {
if (context.getStepNo() % 2= =0) {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
} else {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
}
* Update the centroids based on the sum of points and point number belonging to the same cluster. */
}
Copy the code
6.4 KMeansOutputModel
Special note here, because KMeansOutputModel is the final output model, and the implementation of KMeans algorithm is: All subtasks have all central points, that is to say, all subtasks have the same model, so it is not necessary to output all of them, so the first subtask is limited to output, and the other subtasks are not output.
@Override
public List <Row> calc(ComContext context) {
// Only the first subtask outputs model data.
if(context.getTaskId() ! =0) {
return null; }... modelData.params =new KMeansTrainModelData.ParamSummary();
modelData.params.k = k;
modelData.params.vectorColName = vectorColName;
modelData.params.distanceType = distanceType;
modelData.params.vectorSize = vectorSize;
modelData.params.latitudeColName = latitudeColName;
modelData.params.longtitudeColName = longtitudeColName;
RowCollector collector = new RowCollector();
new KMeansModelDataConverter().save(modelData, collector);
return collector.getRows();
}
Copy the code
0xEE Personal information
Thoughts on life and technology
Wechat public account: Rosie’s Thinking
If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.
0 XFF reference
Differences between several parallel computing models (BSP LogP PRAM)
Ci.apache.org/projects/fl… Clustering, K-means, examples, details
Flink-gelly: Iterative Graph Processing
From BSP model to Apache Hama
Flink DataSet iterative operation
Differences between several parallel computing models (BSP LogP PRAM)
Flink architecture, source code and debug
Flink Dataflow, Task, subTask, Operator Chains, Slot introduction
Flink tasks and scheduling
The generating job diagram of the Flink runtime