Machine learning platform Alink design and architecture from source code

The information was wrong before, which was divided into three articles. Now it is released in full.

0 x00 the

Alink is a new generation of machine learning algorithm platform developed by Alibaba based on real-time computing engine Flink. It is the first machine learning platform in the industry that supports both batch algorithm and streaming algorithm. This is the second in a series of ramble discussions, which will start with the source code and lead you to a detailed analysis of Alink’s design philosophy and architecture.

Since Alink’s public information is too few, it is all self-speculation, and there will definitely be omissions. I hope you can point out that I will update at any time.

0x01 Alink Design Principles

(1) Alink’s rambling talk in the previous article (1) : From the realization of KMeans algorithm to see Alink’s design thought, we can infer and summarize some design principles of Alink

  • Algorithms belong to algorithms, Flink belongs to Flink, try to shield the connection between AI algorithm and Flink.

  • Adopt the simplest, most common development language and way of thinking.

  • Try to learn from common machine learning design ideas and development modes in the market, so that developers can switch seamlessly.

  • Build a middleware or Adapter that blocks Flink, takes advantage of it, and allows users to quickly develop algorithms.

Here’s a look at how Alink developed his own playbook from top to bottom based on these design principles.

To make it easier for you to understand, let’s put together an overview. Since the Alink system can be divided into three levels (top pipeline, middle layer algorithm component, bottom iterative calculation framework) and a Flink Runtime, the following figure shows the program execution process from these four levels.

Pipeline.fit (data).transform(data).print();// From the perspective of the top lineTraining line + -- -- -- -- - > [VectorAssembler (Transformer)] - > [KMeans (Estimator)] |// After kmeans. fit, a KMeansModel is generated for conversion| transformation line + -- -- -- -- - > [VectorAssembler (Transformer)] - > [KMeansModel (Transformer)]// From the mid-tier algorithm component perspectiveTraining algorithm components + -- -- -- -- - > [MapBatchOp] - > [KMeansTrainBatchOp] |// VectorAssemblerMapper in MapBatchOp is a business logic| transformation algorithm components + -- -- -- -- - > [MapBatchOp] -- -- -- -- -- > [ModelMapBatchOp]// VectorAssemblerMapper in MapBatchOp is a business logic
                   // KMeansModelMapper in ModelMapBatchOp is the business logic
 
  
// From the perspective of the underlying iterative computing frameworkTraining by frame + -- -- -- -- - > [VectorAssemblerMapper] - > [KMeansPreallocateCentroid/KMeansAssignCluster/AllReduce / KMeansUpdateCentroids in IterativeComQueue] |// Map to various operators of Flink for training| transformation (directly) + -- -- -- -- - > [VectorAssemblerMapper] -- -- -- -- -- > [KMeansModelMapper]// Map to various operators of Flink for conversion
  
// From the Flink Runtime perspectiveTraining +-----> map, mapPartiiton... |/ / VectorAssemblerMapper. Wait a map is calledConvert | + -- -- -- -- -- > map, mapPartiiton...// For example, call kmeansModelMapper.map to convert
Copy the code

0x02 Alink instance code

Example code or the previous part of the KMeans algorithm module.

Algorithm called

public class KMeansExample {
	    public static void main(String[] args) throws Exception {... BatchOperator data =new CsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR);

        VectorAssembler va = new VectorAssembler()
            .setSelectedCols(new String[]{"sepal_length"."sepal_width"."petal_length"."petal_width"})
            .setOutputCol("features");

        KMeans kMeans = new KMeans().setVectorCol("features").setK(3)
            .setPredictionCol("prediction_result")
            .setPredictionDetailCol("prediction_detail")
            .setReservedCols("category")
            .setMaxIter(100);

        Pipeline pipeline = newPipeline().add(va).add(kMeans); pipeline.fit(data).transform(data).print(); }}Copy the code

Algorithm principal function

public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>
	implements KMeansTrainParams <KMeansTrainBatchOp> {

	staticDataSet <Row> iterateICQ(... Omit...). {return new IterativeComQueue()
			.initWithPartitionedData(TRAIN_DATA, data)
			.initWithBroadcastData(INIT_CENTROID, initCentroid)
			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
			.add(new KMeansPreallocateCentroid())
			.add(new KMeansAssignCluster(distance))
			.add(new AllReduce(CENTROID_ALL_REDUCE))
			.add(new KMeansUpdateCentroids(distance))
			.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
			.closeWith(newKMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) .setMaxIter(maxIter) .exec(); }}Copy the code

Examples of algorithm modules

Based on point count and coordinates, the new clustering center is calculated.

// Update the centroids based on the sum of points and point number belonging to the same cluster.
public class KMeansUpdateCentroids extends ComputeFunction {
    @Override
    public void calc(ComContext context) {

        Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
        Integer k = context.getObj(KMeansTrainBatchOp.K);
        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);

        Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
        if (context.getStepNo() % 2= =0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else{ stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } stepNumCentroids.f0 = context.getStepNo(); context.putObj(KMeansTrainBatchOp.K, updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance)); }}Copy the code

0x03 Top level — Pipeline

This part of the implementation of the design principle is: as far as possible to learn from the market common design ideas and development mode, so that developers seamless switch.

1. Important concepts of machine learning

A typical machine learning process starts with data collection and goes through multiple steps to get the desired output. This is very similar to pipelining, which usually involves ETL of source data (extract, transform, load), data preprocessing, index extraction, model training and cross-validation, new data prediction, etc.

Let’s start with a few important concepts:

  • Transformer: An algorithm that converts one piece of data into another. For example, a model is a Transformer. It can label a test data set that does not contain the transformation label and convert it into another characteristic data that contains the transformation label. Transformer can be understood as feature engineering, namely: feature standardization, feature regularization, feature discretization, feature smoothing, OneHOT coding, etc. This type has a transform method, which is used to fit data, enter new data, and perform a feature transform.
  • Estimator: An Estimator, which is a conceptual abstraction of learning algorithms or training methods on training data. All models of machine learning algorithms are called estimators. It is usually used in pipelines to manipulate data and produce a Transformer. Technically, Estimator implements a method, FIT (), that takes a characteristic data and produces a converter. For example, a random forest algorithm is an Estimator, which can call FIT () to get a random forest model by training feature data.
  • PipeLine: work flow or PipeLine. Workflow links together multiple workflow stages (converters and estimators) to form a machine-learning workflow and get the result output.
  • Parameter: Parameter is used to set parameters for Transformer or Estimator.

2. Concept realization in Alink

As you can see from Alink’s directory structure, Alink provides these common concepts (some of the code borrows from Flink ML).

./java/com/alibaba/alink:
common		operator	params		pipeline
  
./java/com/alibaba/alink/params:
associationrule	evaluation	nlp		regression	statistics
classification	feature		onlinelearning	shared		tuning
clustering	io		outlier		similarity	udf
dataproc	mapper		recommendation	sql		validators

./java/com/alibaba/alink/pipeline:
EstimatorBase.java	ModelBase.java		Trainer.java		feature
LocalPredictable.java	ModelExporterUtils.java	TransformerBase.java	nlp
LocalPredictor.java	Pipeline.java		classification		recommendation
MapModel.java		PipelineModel.java	clustering		regression
MapTransformer.java	PipelineStageBase.java	dataproc		tuning
Copy the code

The PipelineStages, Transformer and Estimator interfaces correspond to two general concepts of machine learning: converter and Estimator. PipelineStages is the base interface for both.

// Base class for a stage in a pipeline. The interface is only a concept, and does not have any actual functionality. Its subclasses must be either Estimator or Transformer. No other classes should inherit this interface directly.
public interface PipelineStage<T extends PipelineStage<T>> extends WithParams<T>, Serializable 

// A transformer is a PipelineStage that transforms an input Table to a result Table.
public interface Transformer<T extends Transformer<T>> extends PipelineStage<T> 
 
// Estimators are PipelineStages responsible for training and generating machine learning models.
public interface Estimator<E extends Estimator<E.M>, M extends Model<M>> extends PipelineStage<E>
Copy the code

Then there are three abstract class definitions: PipelineStageBase, EstimatorBase and TransformerBase, which correspond to the above three interfaces respectively. There are some basic operations defined, such as FIT, transform.

// The base class for a stage in a pipeline, either an EstimatorBase or a TransformerBase.
public abstract class PipelineStageBase<S extends PipelineStageBase<S>>
    implements WithParams<S>, HasMLEnvironmentId<S>, Cloneable 
  
// The base class for estimator implementations.
public abstract class EstimatorBase<E extends EstimatorBase<E.M>, M extends ModelBase<M>>
    extends PipelineStageBase<E> implements Estimator<E.M>   
  
// The base class for transformer implementations.
public abstract class TransformerBase<T extends TransformerBase<T>>
    extends PipelineStageBase<T> implements Transformer<T>  
Copy the code

Then there is the Pipeline base class, which connects Transformer and Estimator together.

// A pipeline is a linear workflow which chains EstimatorBases and TransformerBases to execute an algorithm 
public class Pipeline extends EstimatorBase<Pipeline.PipelineModel> {
	private ArrayList<PipelineStageBase> stages = new ArrayList<>();
  
  	public Pipeline add(PipelineStageBase stage) {
		this.stages.add(stage);
		return this; }}Copy the code

Finally, the Parameter concept is illustrated, such as the VectorAssemblerParams used in the example.

// Parameters for MISOMapper.
public interface MISOMapperParams<T> extends HasSelectedCols <T>,  HasOutputCol <T>,
	HasReservedCols <T> {}

// parameters of vector assembler.
public interface VectorAssemblerParams<T> extends MISOMapperParams<T> {
ParamInfo <String> HANDLE_INVALID = ParamInfoFactory
		.createParamInfo("handleInvalid", String.class)
		.setDescription("parameter for how to handle invalid data (NULL values)")
		.setHasDefaultValue("error")
		.build();
}
Copy the code

In general, because the model and data are uniformly converted to Table type when Alink is running, it can be sorted as follows:

  • Transformer: Convert input table to output table.
  • Estimator: Converts the input table to a model.
  • Model: Convert input table to Output table.

3. Look at the assembly line with examples

First, some basic abstract classes, such as:

  • MapTransformer is a Transformer of flat Map.
  • ModelBase is a model definition, which is also a Transformer.
  • The Trainer is the training model definition and is EstimatorBase.
// Abstract class for a flat map TransformerBase. 
public abstract class MapTransformer<T extends MapTransformer <T>>
		extends TransformerBase<T> implements LocalPredictable {  

// The base class for a machine learning model.
public abstract class ModelBase<M extends ModelBase<M>> extends TransformerBase<M>
    implements Model<M> 

// Abstract class for a trainer that train a machine learning model.
public abstract class Trainer<T extends Trainer <T.M>, M extends ModelBase<M>>
	extends EstimatorBase<T.M> 
Copy the code

Then there are the two type definitions we used for our example.

  • KMeans is a Trainer, which implements EstimatorBase;
  • The VectorAssembler is a TransformerBase.
// This is an EstimatorBase type
public class KMeans extends Trainer <KMeans.KMeansModel> implements
	KMeansTrainParams <KMeans>, KMeansPredictParams <KMeans> {
	@Override
	protected BatchOperator train(BatchOperator in) {
		return new KMeansTrainBatchOp(this.getParams()).linkFrom(in); }}// This is a TransformerBase type
public class VectorAssembler extends MapTransformer<VectorAssembler>
	implements VectorAssemblerParams <VectorAssembler> {
	public VectorAssembler(Params params) {
		super(VectorAssemblerMapper::new, params); }}Copy the code

In the example, two pipeline phases are built separately, and the two instances are then linked to the pipeline.

VectorAssembler va = new VectorAssembler()
KMeans kMeans = new KMeans()  
Pipeline pipeline = new Pipeline().add(va).add(kMeans);

// we can see that there are two stages in the pipeline, namely VectorAssembler and KMeans.

pipeline = {Pipeline@1201} 
 stages = {ArrayList@2853}  size = 2
   
  0 = {VectorAssembler@1199} 
   mapperBuilder = {VectorAssembler$lambda@2859} 
   params = {Params@2860} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
     
  1 = {KMeans@1200} 
   params = {Params@2857} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
Copy the code

0x04 Middle Tier — Algorithm component

An algorithm component is an intermediate layer concept that can be thought of as the module/layer that actually implements the algorithm. The main function is to connect the past and the next.

  • The upper layer is each stage of the pipeline, and the result of the pipeline is an algorithm component. The function of the algorithm component is to translate the pipeline Estimator or Transformer into a specific algorithm. The algorithm components are connected to each other in series by linkFrom.
  • At the lower level is the “iterative calculation framework”. The algorithm component divides the calculation/communication in the specific algorithm logic into small modules and maps them to Mapper Function or the calculation/communication Function of the specific “iterative calculation framework”, so as to make better use of various advantages of Flink.
  • In “iterative computing framework”, the main two parts are Mapper Function and calculation/communication Function, which correspond to Mapper and ComQueueItem respectively in the code.
  • Mapper Function is a mapping Function (the system has written part of the Mapper, users can also write their own Mapper according to the algorithm);
  • Computing/communication functions are special functions written specifically for algorithms (also divided into system built-in, algorithm custom).
  • Functions are business logic (components). The algorithm component simply provides the running rules, and the business logic (component) acts as a plug-in running on the algorithm component.
  • Another way to think about it: An algorithm component is a framework that delegates part of its business logic to a Mapper or ComQueueItem.

Such as

  • KMeans is Estimator, and its corresponding algorithm component is KMeansTrainBatchOp. Its business logic (components), also in this class, is a series of algorithm classes (ComQueueItem) that are concatenated based on IterativeComQueue.
  • The VectorAssembler is the Transformer whose corresponding algorithm component is MapBatchOp. Its business logic (component) is the VectorAssemblmapper (its map function does the business logic by aggregating multiple numeric columns into a single vector column in sequence).
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>   implements KMeansTrainParams <KMeansTrainBatchOp> 
    
// class for a flat map BatchOperator.
public class MapBatchOp<T extends MapBatchOp<T>> extends BatchOperator<T> 
Copy the code

The essence of the call to estimator.fit or Transformer. Transform is to link operators together through the linkFrom function, thus concatenating data flows. Then it can gradually map to the specific Flink operation plan.

1. Algorithm operators

AlgoOperator is the base class for operator components, and subclasses include BatchOperator and StreamOperator, which correspond to batch and stream processing, respectively.

// Base class for algorithm operators.
public abstract class AlgoOperator<T extends AlgoOperator<T>>
    implements WithParams<T>, HasMLEnvironmentId<T>, Serializable 

// Base class of batch algorithm operators.
public abstract class BatchOperator<T extends BatchOperator <T>> extends AlgoOperator <T> {
    // Link this object to BatchOperator using the BatchOperators as its input.
  	public abstract T linkFrom(BatchOperator 
       ... inputs);
  
    public<B extends BatchOperator <? >>B linkTo(B next) {
      return link(next);
    }
    public BatchOperator print(a) throws Exception {
      return linkTo(newPrintBatchOp().setMLEnvironmentId(getMLEnvironmentId())); }}public abstract class StreamOperator<T extends StreamOperator <T>> extends AlgoOperator <T>
Copy the code

Example code is as follows:

// The input CSV file is converted to a BatchOperator
BatchOperator data = newCsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR); . pipeline.fit(data).transform(data).print();Copy the code

2. Mapper (explained in advance)

Mapper is part of the underlying iterative computing framework and is the business logic (component). You can see this in the directory structure. It is explained here in advance because it is extensively involved in the process of assembly line explanation, so it is explained here in advance.

./java/com/alibaba/alink/common
linalg mapper model comqueue utils io
Copy the code

The main classes of Mapper are defined as follows. They can map inputs to outputs or models to specific values.

// Abstract class for mappers.
public abstract class Mapper implements Serializable {}

// Abstract class for mappers with model.
public abstract class ModelMapper extends Mapper {}

// Find the closest cluster center for every point.
public class KMeansModelMapper extends ModelMapper {}

// Mapper with Multi-Input columns and Single Output column(MISO).
public abstract class MISOMapper extends Mapper {}

// This mapper maps many columns to one vector. the columns should be vector or numerical columns.
public class VectorAssemblerMapper extends MISOMapper {}
Copy the code

The business logic of Mapper relies on algorithm components to operate, such as [VectorAssemblmapper in MapBatchOp], [KMeansModelMapper in ModelMapBatchOp].

The specific operation of ModelMapper depends on the Connection between ModelMapperAdapter and Flink Runtime. The ModelMapperAdapter inherits the RichMapFunction, with ModelMapper as its member variable, performing the business logic in the Map operation, and ModelSource as the data source.

For this example, KMeansModelMapper is the BatchOperator for the final transformation, and its map function is used for the transformation.

3. Built-in algorithm components

Some common algorithm components are built into the system, such as:

  • The MapBatchOp function is an algorithm component returned by VectorAssembler to flat map based on input.
  • The function of ModelMapBatchOp is to carry out flat map based on model, which is the algorithm component returned by KMeans.

LinkFrom = ModelMapBatchOp = ModelMapBatchOp = ModelMapBatchOp = ModelMapBatchOp

  • Combine their inputs with their own. This forms a logical chain of algorithms.
  • The business logic is mapped to “Flink operators “, which forms a “Flink operator chain “.
public class ModelMapBatchOp<T extends ModelMapBatchOp<T>> extends BatchOperator<T> {
	@Override
	public T linkFrom(BatchOperator
       ... inputs) {
		checkOpSize(2, inputs);

		try {
			BroadcastVariableModelSource modelSource = new BroadcastVariableModelSource(BROADCAST_MODEL_TABLE_NAME);
      // mapper is a mapping function
			ModelMapper mapper = this.mapperBuilder.apply(
					inputs[0].getSchema(),
					inputs[1].getSchema(),
					this.getParams());
      // modelRows is the model
			DataSet<Row> modelRows = inputs[0].getDataSet().rebalance();
      // resultRows is a mapping change of input data
			DataSet<Row> resultRows = inputs[1].getDataSet()
					.map(new ModelMapperAdapter(mapper, modelSource))
           // Use the model as a broadcast variable to be used later in the ModelMapperAdapter
					.withBroadcastSet(modelRows, BROADCAST_MODEL_TABLE_NAME);

			TableSchema outputSchema = mapper.getOutputSchema();
			this.setOutput(resultRows, outputSchema);
			return (T) this;
		} catch (Exception ex) {
			throw newRuntimeException(ex); }}}Copy the code

ModelMapperAdapter

The ModelMapperAdapter is an implementation of the adapter used to run the business logic Mapper on Flink. As can be seen from the code, the ModelMapperAdapter takes out the mapper and model data stored before, and then carries out the specific algorithm business based on this.

/**
 * Adapt a {@link ModelMapper} to run within flink.

 * This adapter class hold the target {@link ModelMapper} and it's {@link ModelSource}. Upon open(), it will load model rows from {@link ModelSource} into {@link ModelMapper}.
 */
public class ModelMapperAdapter extends RichMapFunction<Row.Row> implements Serializable {

    /** * The ModelMapper to adapt. */
    private final ModelMapper mapper;

    /** * Load model data from ModelSource when open(). */
    private final ModelSource modelSource;

    public ModelMapperAdapter(ModelMapper mapper, ModelSource modelSource) {
        // Mapper is the business logic, modelSource is the Broadcast source model
        this.mapper = mapper; // Execute the business logic in the map operation
        this.modelSource = modelSource; // Data source
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // Get model data from broadcast variables
        List<Row> modelRows = this.modelSource.getModelRows(getRuntimeContext());
        this.mapper.loadModel(modelRows);
    }

    @Override
    public Row map(Row row) throws Exception {
        // Execute the business logic to convert on the data source
        return this.mapper.map(row); }}Copy the code

4. The training phase fit

In pipeline.fit(data), execution is performed along the pipeline. If Transformer is encountered in the next stage of the pipeline, its transform is called; If EstimatorBase is encountered, call its FIT, convert EstimatorBase to Transformer, and then call the Transformer. Transform again. Just do it one stage at a time.

4.1 Specific assembly line processing

  1. If EstimatorBase is encountered in the next phase of the pipeline, the FIT of EstimatorBase is processed and the Estimator on the pipeline is converted to TransformerBase. Estimator.fit takes a characteristic data and produces a converter.

    (if this phase is not the last phase of the pipeline) processing continues on the TransformerBase. After processing, it can move to the next stage of the assembly line.

    (if this phase is the last phase of the pipeline) no processing is done to the TransformerBase and the pipeline FIT operation is terminated.

  2. If the next phase of the pipeline encounters TransformerBase, its transform function is directly called.

  3. For all TransformerBases to be processed, either from EstimatorBase or from Pipeline, the transform function is called to transform their input. input = transformers[i].transform(input); . The output of each transformation is then assigned to the input again for the next stage of the pipeline.

  4. The result is a PipelineModel (itself a Transformer), which is part of the next stage of the pipelining process.

4.2 Overview of this example

This example has two stages. VectorAssembler is Transformer and KMeans is EstimatorBase.

At this point, the internal variable of Pipeline is:

this = {Pipeline@1195} 
 stages = {ArrayList@2851}  size = 2
     
  0 = {VectorAssembler@1198} 
   mapperBuilder = {VectorAssembler$lambda@2857} 
   params = {Params@2858} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
       
  1 = {KMeans@2856} 
   params = {Params@2860} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
    params = {HashMap@2862}  size = 6
Copy the code
  • The Pipeline first calls a Transformer VectorAssembler to process its input (the BatchOperator of CSV). This processing CSV is built with linkFrom(input). Then wrap it up as a MapBatchOp and return the assignment to the input.
  • Then call the kmeans. fit function of type EstimatorBase to fit the input (which is the MapBatchOp returned by VectorAssembler). Fit process invokes the KMeansTrainBatchOp. LinkFrom to set, fit generates a KMeansModel (Transformer). Since this is the last step of the pipeline, the subsequent kmeansModel.transform operation is not performed. KMeansModel is the trained judgment model.
  • During the above call, the Transformer that the TransformerBase and EstimatorBase are computed to is recorded in the Transformers array.
  • Finally, take the Transformers array and generate a PipelineModel (which is also a Transformer type). PipelineModel is generated for the purpose of PipelineModel, which is a new pipeline in subsequent transformations.

PipelineMode’s new pipelining process reads/maps from CSV (processed by VectorAssembler) and then transforms from KMeansModel (described in the next section).

The code for fit is

public class Pipeline extends EstimatorBase<Pipeline.PipelineModel> {
  
    // Train the pipeline with batch data.
  	public PipelineModel fit(BatchOperator input) {
      
      int lastEstimatorIdx = getIndexOfLastEstimator();
      TransformerBase[] transformers = new TransformerBase[stages.size()];
      for (int i = 0; i < stages.size(); i++) {
        PipelineStageBase stage = stages.get(i);
        if (i <= lastEstimatorIdx) {
          if (stage instanceof EstimatorBase) {
            // The specific Algorithm operators on the pipeline are concatenated by linkFrom.
            transformers[i] = ((EstimatorBase) stage).fit(input); 
          } else if (stage instanceof TransformerBase) {
            transformers[i] = (TransformerBase) stage;
          }
          // Note that if it is the last stage of the pipeline, the transform is not processed.
          if (i < lastEstimatorIdx) {
            The transform function of the specific Transformer is called, which concatenates the specific Algorithm operators on the pipeline with the linkFrom function.input = transformers[i].transform(input); }}else{ transformers[i] = (TransformerBase) stage; }}// A PipelineModel is generated here, and the transformers will pass it as a parameter
      return newPipelineModel(transformers).setMLEnvironmentId(input.getMLEnvironmentId()); }}MapTransformer is the base class of VectorAssembler. The transform generates a MapBatchOp and then calls mapbatchop.linkFrom.
public abstract class MapTransformer<T extends MapTransformer <T>>
		extends TransformerBase<T> implements LocalPredictable {
	@Override
	public BatchOperator transform(BatchOperator input) {
		return new MapBatchOp(this.mapperBuilder, this.params).linkFrom(input); }}// Trainer is the base class of KMeans.
public abstract class Trainer<T extends Trainer <T.M>, M extends ModelBase<M> > @Override
	public M fit(BatchOperator input) {
    // KMeansTrainBatchOp(this.getParams()).linkFrom(in);
    / / createModel generates a new model, in this example is com. Alibaba. Alink. Pipeline. Clustering. KMeansModel
  		returncreateModel(train(input).getOutputTable()); }}Copy the code

Each of these will be discussed below.

4.3 VectorAssembler. The transform

This part of the function is to convert CSV data into the data type required for KMeans training.

VectorAssembler. The transform will call to MapBatchOp linkFrom. LinkFrom first converts the CSV input into a DataSet, and then generates a MapBatchOp as the parameter, which returns the MapBatchOp. The business logic is implemented in the VectorAssemblmapper, where multiple numeric columns are sequentially aggregated into a vector column.

public class MapBatchOp<T extends MapBatchOp<T>> extends BatchOperator<T> {
    public T linkFrom(BatchOperator
       ... inputs) {
        BatchOperator in = checkAndGetFirst(inputs);

        try {
            Mapper mapper = (Mapper)this.mapperBuilder.apply(in.getSchema(), this.getParams());
            // The CSV input is mapped, and the logical execution plan is generated, which will be done after print.
            DataSet<Row> resultRows = in.getDataSet().map(new MapperAdapter(mapper));
            TableSchema resultSchema = mapper.getOutputSchema();
            this.setOutput(resultRows, resultSchema);
            return this;
        } catch (Exception var6) {
            throw newRuntimeException(var6); }}}/ / MapBatchOp itself
this = {MapBatchOp@3748} "UnnamedTable$1"
 mapperBuilder = {VectorAssembler$lambda@3744} 
 params = {Params@3754} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
 output = {TableImpl@5862} "UnnamedTable$1"
 sideOutputs = null
     
// Mapper is a business logic module
mapper = {VectorAssemblerMapper@5785} 
 handleInvalid = {VectorAssemblerMapper$HandleType@5813} "ERROR"
 outputColsHelper = {OutputColsHelper@5814} 
 colIndices = {int[4] @5815} 
 dataFieldNames = {String[5] @5816} 
 dataFieldTypes = {DataType[5] @5817} 
 params = {Params@5818} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
     
// Return the following value
resultRows = {MapOperator@5788} 
 function = {MapperAdapter@5826} 
  mapper = {VectorAssemblerMapper@5785} 
 defaultName = "linkFrom(MapBatchOp.java:35)"      
     
// Call stack is as follows

linkFrom:31, MapBatchOp (com.alibaba.alink.operator.batch.utils)
transform:34, MapTransformer (com.alibaba.alink.pipeline)
fit:122, Pipeline (com.alibaba.alink.pipeline)
main:31, KMeansExample (com.alibaba.alink)
Copy the code

4.4 KMeans. Fit

This is the training model.

KMeans is a Trainer, which in turn implements the EstimatorBase type, so the pipeline calls its FIT function

KMeans. Fit is a call to trainer. fit.

  • Trainer. Fit first calls the “train” function, and finally call KMeansTrainBatchOp. LinkFrom, such as VectorAssembler together. KMeansTrainBatchOp processes the MapBatchOp returned by the VectorAssembler. Finally return a KMeansTrainBatchOp of the same type.
  • Trainer.fit then calls trainer. createModel, which determines what Model should be generated based on the type of this. For KMeans, a KMeansModel is generated.

Input = transformers[I]. Transform (input); input = transformers[I]. So for now it’s still training, generating a model KMeansModel.

// The actual part of the code

Trainer.fit(BatchOperator input) {
		return createModel(train(input).getOutputTable());
}
  
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>
	implements KMeansTrainParams <KMeansTrainBatchOp> {	
    
    	public KMeansTrainBatchOp linkFrom(BatchOperator 
       ... inputs) {
            DataSet <Row> finalCentroid = iterateICQ(initCentroid, data,
                vectorSize, maxIter, tol, distance, distanceType, vectorColName, null.null);
            this.setOutput(finalCentroid, new KMeansModelDataConverter().getModelSchema());
            return this; }}// Variable content
			
this = {KMeansTrainBatchOp@5887} 
 params = {Params@5895} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
 output = null
 sideOutputs = null
inputs = {BatchOperator[1] @5888} 
 0 = {MapBatchOp@3748} "UnnamedTable$1"
  mapperBuilder = {VectorAssembler$lambda@3744} 
  params = {Params@3754} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
  output = {TableImpl@5862} "UnnamedTable$1"
  sideOutputs = null			
			
// Call stack is as follows
			
linkFrom:84, KMeansTrainBatchOp (com.alibaba.alink.operator.batch.clustering)
train:31, KMeans (com.alibaba.alink.pipeline.clustering)
fit:34, Trainer (com.alibaba.alink.pipeline)
fit:117, Pipeline (com.alibaba.alink.pipeline)
main:31, KMeansExample (com.alibaba.alink)			
Copy the code

KMeansTrainBatchOp linkFrom algorithm is key. In fact, all the premises required by the algorithm are generated here, and all kinds of Flink operators are built well. We’ll talk about that later.

The FIT function generates the KMeansModel, whose transform function, implemented in the base class MapModel, completes the call in the next transform phase. This is the trained KMeans model, which is also a Transformer.

// Find the closest cluster center for every point.
public class KMeansModel extends MapModel<KMeansModel>
	implements KMeansPredictParams <KMeansModel> {

	public KMeansModel(Params params) {
		super(KMeansModelMapper::new, params); }}Copy the code

4.5 Generate a new transformation pipeline

As mentioned earlier, the Pipeline fit function returns a PipelineModel. The PipelineModel process then calls the Transform to complete the transformation phase.

return new PipelineModel(transformers).setMLEnvironmentId(input.getMLEnvironmentId());
Copy the code

5. Transform stage

In the transformation stage, the assembly line still starts with VectorAssembler to read CSV for map processing. Then call KMeansModel.

PipelineModel continues to call the transform function. This is used to convert Transformer to BatchOperator. At this point, its internal variables are as follows, which shows that it has been converted from the original pipeline of various types to a unified transform instance.

this = {PipelineModel@5016} 
 transformers = {TransformerBase[2] @5017} 

  0 = {VectorAssembler@1198} 
   mapperBuilder = {VectorAssembler$lambda@2855} 
   params = {Params@2856} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
     
  1 = {KMeansModel@5009} 
   mapperBuilder = {KMeansModel$lambda@5011} 
   modelData = {TableImpl@4984} "UnnamedTable$2"
   params = {Params@5012} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
 modelData = null
 params = {Params@5018} "Params {MLEnvironmentId=0}"
Copy the code
  • The transform for the first time to call MapBatchOp linkFrom, is VectorAssembler. Call the transform, its role and plays the role of the fit line, explained in the following comments.

  • The second transform calls modelMapbatchop.linkFrom, which is indirectly called by kmeansModel.transform. This is explained in the comments below.

These two calls to transform generate a concatenation of the BatchOperator. The final result returned is ModelMapBatchOp, which is a BatchOperator. The transformations will be converted by ModelMapBatchOp.

// The model fitted by Pipeline.
public class PipelineModel extends ModelBase<PipelineModel> implements LocalPredictable {
    @Override
    publicBatchOperator<? > transform(BatchOperator input) {for (TransformerBase transformer : this.transformers) {
            input = transformer.transform(input);
        }
        returninput; }}// After the change, we get a final conversion result BatchOperator, which is used to convert
// {KMeansModel$lambda@5050} is KMeansModelMapper.

input = {ModelMapBatchOp@5047} "UnnamedTable$3"
 mapperBuilder = {KMeansModel$lambda@5050} 
 params = {Params@5051} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
  params = {HashMap@5058}  size = 6
   "vectorCol" -> ""features""
   "maxIter" -> "100"
   "reservedCols" -> "["category"]"
   "k" -> "3"
   "predictionCol" -> ""prediction_result""
   "predictionDetailCol" -> ""prediction_detail""
 output = {TableImpl@5052} "UnnamedTable$3"
  tableEnvironment = {BatchTableEnvironmentImpl@5054} 
  operationTree = {DataSetQueryOperation@5055} 
  operationTreeBuilder = {OperationTreeBuilder@5056} 
  lookupResolver = {LookupCallResolver@5057} 
  tableName = "UnnamedTable$3"
 sideOutputs = null
    
MapTransformer is the base class of VectorAssembler. The transform generates a MapBatchOp and then calls mapbatchop.linkFrom.
public abstract class MapTransformer<T extends MapTransformer <T>>
		extends TransformerBase<T> implements LocalPredictable {
	@Override
	public BatchOperator transform(BatchOperator input) {
		return new MapBatchOp(this.mapperBuilder, this.params).linkFrom(input); }}// MapModel is the base class of KMeansModel. The transform generates a ModelMapBatchOp and then calls modelMapBatchop.linkFrom.
public abstract class MapModel<T extends MapModel<T>>
		extends ModelBase<T> implements LocalPredictable {
	@Override
	public BatchOperator transform(BatchOperator input) {
		return new ModelMapBatchOp(this.mapperBuilder, this.params)
				.linkFrom(BatchOperator.fromTable(this.getModelData()) .setMLEnvironmentId(input.getMLEnvironmentId()), input); }}Copy the code

In each linkFrom, two mapOperators are generated and then spliced together to form a BatchOperator string. As can be seen from the above code, the linkFrom of the ModelMapBatchOp corresponding to KMeansModel will return a ModelMapperAdapter. The ModelMapperAdapter is a RichMapFunction type that stores KMeansModelMapper as a richmapfunction. function member variable. Then.map(new ModelMapperAdapter(mapper, modelSource)) is called, map is the Flink operator, and the conversion algorithm is associated with Flink.

Finally, the Keans algorithm is transformed by KmeansModelMapper.map.

6. Run

As we all know, in a Flink program, in order for the program to run, you need to

  • Get execution Environment: call similargetExecutionEnvironment()To get the environment;
  • Trigger program execution: similar callsenv.execute("KMeans Example");To actually execute.

Alink is just a Flink app, but a lot more complicated than a regular Flink app.

But we don’t see any such calls in the example code. This shows that Alink encapsulates very well, but as curious programmers, we need to know exactly where these calls are hidden.

Obtaining the Execution Environment

Alink is to get the running environment during Pipeline execution. Specifically, because the CSV file is the initial input, the runtime environment is retrieved when the transform calls its in.getSchema().

public final class CsvSourceBatchOp extends BaseSourceBatchOp<CsvSourceBatchOp>
    implements CsvSourceParams<CsvSourceBatchOp> {
    @Override
    public Table initializeDataSource(a) {
      ExecutionEnvironment execEnv = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();
    }
}

initializeDataSource:77, CsvSourceBatchOp (com.alibaba.alink.operator.batch.source)
getOutputTable:52, BaseSourceBatchOp (com.alibaba.alink.operator.batch.source)
getSchema:180, AlgoOperator (com.alibaba.alink.operator)
linkFrom:34, MapBatchOp (com.alibaba.alink.operator.batch.utils)
transform:34, MapTransformer (com.alibaba.alink.pipeline)
fit:122, Pipeline (com.alibaba.alink.pipeline)
main:31, KMeansExample (com.alibaba.alink)
Copy the code

Trigger program run

Up to now, Alink has done a lot of things that map to Flink operators, so where does it really connect with Flink?

Print calls batchOperator. print, and it’s really going to start here, and it’s going to go from layer to layer, and it’s going to go to

package com.alibaba.alink.operator.batch.utils;

public class PrintBatchOp extends BaseSinkBatchOp<PrintBatchOp> {
	@Override
	protected PrintBatchOp sinkFrom(BatchOperator in) {
		this.setOutputTable(in.getOutputTable());
		if (null! =this.getOutputTable()) {
			try {
                // After this collect, the Flink runtime will be entered.
				List <Row> rows = DataSetConversionUtil.fromTable(getMLEnvironmentId(), this.getOutputTable()).collect();
				batchPrintStream.println(TableUtil.formatTitle(this.getColNames()));
				for(Row row : rows) { batchPrintStream.println(TableUtil.formatRows(row)); }}catch (Exception ex) {
				throw newRuntimeException(ex); }}return this; }}Copy the code

Link Alink to Flink’s operating environment here in LocalEnvironment.

public class LocalEnvironment extends ExecutionEnvironment {
	@Override
	public String getExecutionPlan(a) throws Exception {
		Plan p = createProgramPlan(null.false);
        
        // The following will actually be associated with Flink.
		if(executor ! =null) {
			return executor.getOptimizerPlanAsJSON(p);
		}
		else {
			PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration);
			returntempExecutor.getOptimizerPlanAsJSON(p); }}}// Call stack is as follows

execute:91, LocalEnvironment (org.apache.flink.api.java)
execute:820, ExecutionEnvironment (org.apache.flink.api.java)
collect:413, DataSet (org.apache.flink.api.java)
sinkFrom:40, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
sinkFrom:18, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
link:89, BatchOperator (com.alibaba.alink.operator.batch)
linkTo:239, BatchOperator (com.alibaba.alink.operator.batch)
print:337, BatchOperator (com.alibaba.alink.operator.batch)
main:31, KMeansExample (com.alibaba.alink)
Copy the code

0x05 Low-level — Iterative computing framework

The design principles are as follows:

  • Build a middleware or adapter that blocks Flink, takes advantage of it, and allows users to quickly develop algorithms based on it.
  • Adopt the simplest and most common development language and development mode.

Let’s think about the groundwork that needs to be done:

  • How to initialize
  • How to communicate
  • How do you split the code? How do you broadcast the code
  • How do you split the data, how do you broadcast the data
  • How to iterate an algorithm

The most important concept is IterativeComQueue, which abstracts communication or computation into comQueueItems and then concatenates comQueueItems into queues. Thus a set of iterative communication computing framework for iterative computing scenarios is formed.

Listing the directory structure here again:

./java/com/alibaba/alink/common:
MLEnvironment.java		linalg MLEnvironmentFactory.java	mapper
VectorTypes.java		model comqueue			utils io
Copy the code

There are roughly:

  • Flink encapsulated module: MLEnvironment. Java, MLEnvironmentFactory. Java.
  • Linear algebra module: LinalG.
  • Computing/communication queue module: COMQueue, where ComputeFunction performs computations, such as training algorithms.
  • Mapping module: Mapper, in which mapper carries out various mappings, for example, ModelMapper maps models to numerical values (that is, transformation algorithms).
  • Model: model, mainly used to read model source.
  • Basic modules: Utils, IO.

The algorithm component does the following in its linkFrom function:

  • Partial initialization is done first, at which point partial Flink operators such as groupBy and so on are called.
  • Strip out the algorithm logic and delegate it to Mapper or ComQueueItem.
  • Mapper or ComQueueItem calls Flink Map or mapPartition operators.
  • The process of calling Flink operator is to segment the algorithm and then adapt it to Flink.

Here are the details.

1. Flink context encapsulation

MLEnvironment is an important class. It encapsulates the runtime context necessary for Flink development. Users can obtain various actual operating environments through this class, you can create tables, you can run SQL statements.

/** * The MLEnvironment stores the necessary context in Flink. * Each MLEnvironment will be associated with a unique ID.  * The operations associated with the same MLEnvironment ID * will share the same Flink job context. */
public class MLEnvironment {
    private ExecutionEnvironment env;
    private StreamExecutionEnvironment streamEnv;
    private BatchTableEnvironment batchTableEnv;
    private StreamTableEnvironment streamTableEnv;
}
Copy the code

2. Function

Function is the smallest module in the computing framework for business logic such as computation and communication. The specific definition is as follows.

  • A ComputeFunction is a computing module.
  • CommunicateFunction is a communication module. CommunicateFunction and ComputeFunction, both ComQueueItem subclasses, are business logic implementers.
  • CompareCriterionFunction is a judgment module used to determine when to terminate the loop. This allows the user to specify iteration termination conditions.
  • The CompleteResultFunction is called at the end of the loop as the result of the loop.
  • A Mapper is also a Funciton, that is, a Mapper Function.

This will be referred to collectively as Function.

/**
 * Basic build block in {@link BaseComQueue}, for either communication or computation.
 */
public interface ComQueueItem extends Serializable {}

/** * An BaseComQueue item for computation. */
public abstract class ComputeFunction implements ComQueueItem {

	/**
	 * Perform the computation work.
	 *
	 * @param context to get input object and update output object.
	 */
	public abstract void calc(ComContext context);
}

/** * An BaseComQueue item for communication. */
public abstract class CommunicateFunction implements ComQueueItem {

    /**
     * Perform communication work.
     *
     * @param input     output of previous queue item.
     * @param sessionId session id for shared objects.
     * @param <T>       Type of dataset.
     * @return result dataset.
     */
	public abstract <T> DataSet <T> communicateWith(DataSet <T> input, int sessionId);
}
Copy the code

Combined with our code, part of the function of the KMeansTrainBatchOp algorithm component is that the KMeans algorithm is segmented into several communicatefunctions. It is then added to the compute communication queue.

In the following code, the specific Item is as follows:

  • ComputeFunction: KMeansPreallocateCentroid, KMeansAssignCluster KMeansUpdateCentroids
  • CommunicateFunction :AllReduce
  • CompareCriterionFunction :KMeansIterTermination
  • CompleteResultFunction: KMeansOutputModel

That is, the main work of algorithm implementation is:

  • An IterativeComQueue is built.
  • There are two ways to initialize data: initWithPartitionedData To cache DataSet fragments to memory. InitWithBroadcastData caches the DataSet as a whole into the memory of each worker.
  • Divide the computation into several computefunctions, concatenated in the IterativeComQueue
  • AllReduce communication model is used to complete data synchronization
	staticDataSet <Row> iterateICQ(... Omit...). {return new IterativeComQueue()
			.initWithPartitionedData(TRAIN_DATA, data)
			.initWithBroadcastData(INIT_CENTROID, initCentroid)
			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
			.add(new KMeansPreallocateCentroid())
			.add(new KMeansAssignCluster(distance))
			.add(new AllReduce(CENTROID_ALL_REDUCE))
			.add(new KMeansUpdateCentroids(distance))
			.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
			.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
			.setMaxIter(maxIter)
			.exec();
	}
Copy the code

3. Calculate/communicate queues

The BaseComQueue is the basis of this iteration framework. It maintains a List

queue. When users generate algorithm modules, they add various functions to the queue.

IterativeComQueue is BaseComQueue default implementation, specific implements setMaxIter setCompareCriterionOfNode0 two functions.

Two important BaseComQueue functions are:

  • Optimize function: String together adjacent computefunctions on the queue to form a chain computation. Optimization within frameworks is one of Alink’s strengths.
  • Exec Function: runs each Function on the queue and returns the final Dataset. In fact, this is where Flink really comes in, such as mapping the various computefunctions on the calculation queue to Flink’s RichMapPartitionFunction. Then in the mapPartition function call, a piece of real algorithm logic is calledcomputation.calc(context);.

Think of BaseComQueue as a logical concept that allows algorithm engineers to better organize their business languages. The exec function maps the algorithm logic to the Flink operator. This plays a role of decoupling Flink to some extent.

The specific definition (extract part of the code inside the function) is as follows:

// Base class for the com(Computation && Communicate) queue.
public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable {

	/** * All computation or communication functions. */
	private final List<ComQueueItem> queue = new ArrayList<>();
    
	/** * The function executed to decide whether to break the loop. */
	private CompareCriterionFunction compareCriterion;

	/** * The function executed when closing the iteration */
	private CompleteResultFunction completeResult;    
    
	private void optimize(a) {
		if (queue.isEmpty()) {
			return;
		}

		int current = 0;
		for (int ahead = 1; ahead < queue.size(); ++ahead) {
			ComQueueItem curItem = queue.get(current);
			ComQueueItem aheadItem = queue.get(ahead);

            // Whether it's both ComputeFunction before and after, and then merge into ChainedComputation
			if (aheadItem instanceof ComputeFunction && curItem instanceof ComputeFunction) {
				if (curItem instanceof ChainedComputation) {
					queue.set(current, ((ChainedComputation) curItem).add((ComputeFunction) aheadItem));
				} else {
					queue.set(current, newChainedComputation() .add((ComputeFunction) curItem) .add((ComputeFunction) aheadItem) ); }}else {
				queue.set(++current, aheadItem);
			}
		}

		queue.subList(current + 1, queue.size()).clear();
	}    
    
	/**
	 * Execute the BaseComQueue and get the result dataset.
	 *
	 * @return result dataset.
	 */
	public DataSet<Row> exec(a) {
        
		optimize();

		IterativeDataSet<byte[]> loop
			= loopStartDataSet(executionEnvironment)
			.iterate(maxIter);

		DataSet<byte[]> input = loop
			.mapPartition(new DistributeData(cacheDataObjNames, sessionId))
			.withBroadcastSet(loop, "barrier")
			.name("distribute data");

		for (ComQueueItem com : queue) {
			if ((com instanceof CommunicateFunction)) {
				CommunicateFunction communication = ((CommunicateFunction) com);         
         // Call such as AllReduce.communication, which returns allReduce wrapped and assigned to input. When the loop encounters the next ComputeFunction (KMeansUpdateCentroids), Will assign input to it for processing. For example, input = {MapPartitionOperator@5248}, input.function = {AllReduce$AllReduceRecv@5260}, To indirectly call KMeansUpdateCentroids.
				input = communication.communicateWith(input, sessionId);
			} else if (com instanceof ComputeFunction) {
				final ComputeFunction computation = (ComputeFunction) com;

        // Map each ComputeFunction on the calculation queue to Flink's RichMapPartitionFunction.
				input = input
						.mapPartition(new RichMapPartitionFunction<byte[].byte[] > () {@Override
						public void mapPartition(Iterable<byte[]> values, Collector<byte[]> out) {
							ComContext context = new ComContext(
								sessionId, getIterationRuntimeContext()
							);
              // In this case, Flink will call the specific calculation function, which is the algorithm engineer split algorithm fragment.
							computation.calc(context);
						}
					})
					.withBroadcastSet(input, "barrier")
					.name(com instanceof ChainedComputation ?
						((ChainedComputation) com).name()
						: "computation@" + computation.getClass().getSimpleName());
			} else {
				throw new RuntimeException("Unsupported op in iterative queue."); }}returnserializeModel(clearObjs(loopEnd)); }}Copy the code

4. Mapper (Function)

Mapper is a part of the underlying iterative computing framework, which can be considered as Mapper Function. Because business logic is involved, this is explained in advance.

5. The initialization

Initialization happens in KMeansTrainBatchOp. LinkFrom. Rebalance ().map()) because it’s not connected to the framework, the calculation is user-controlled and doesn’t need to be added to the IterativeComQueue.

If a calculation is added to the IterativeComQueue and you have to play the Flink operator yourself, the framework is stuck and doesn’t know what to do. So user freedom can only happen before the framework is contacted.

	@Override
	public KMeansTrainBatchOp linkFrom(BatchOperator 
       ... inputs) {
		DataSet <FastDistanceVectorData> data = statistics.f0.rebalance().map(
			new MapFunction <Vector, FastDistanceVectorData>() {
				@Override
				public FastDistanceVectorData map(Vector value) {
					return distance.prepareVectorData(Row.of(value), 0); }}); . }Copy the code

The framework also provides the initialization function to cache the DataSet to the memory, including Partition and Broadcast. The former fragments DataSet to the memory, while the latter caches the DataSet as a whole to the memory of each worker.

		return new IterativeComQueue()
			.initWithPartitionedData(TRAIN_DATA, data)
			.initWithBroadcastData(INIT_CENTROID, initCentroid)
			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
            ......
Copy the code

6. ComputeFunction

This is the specific calculation module of the algorithm. The algorithm engineer should divide the algorithm into modules that can be processed in parallel and implement them with ComputeFunction respectively, so as to make use of the distributed computing effect of Flnk.

This code calculates the nearest cluster center for each point, and counts and sums the coordinates of each cluster center:

/** * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. */
public class KMeansAssignCluster extends ComputeFunction {
    private FastDistance fastDistance;
    private transient DenseMatrix distanceMatrix;

    @Override
    public void calc(ComContext context) {
        Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
        Integer k = context.getObj(KMeansTrainBatchOp.K);
        // get iterative coefficient from static memory.
        Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
        if (context.getStepNo() % 2= =0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }

        if (null == distanceMatrix) {
            distanceMatrix = new DenseMatrix(k, 1);
        }

        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);
        if (sumMatrixData == null) {
            sumMatrixData = new double[k * (vectorSize + 1)];
            context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData);
        }

        Iterable<FastDistanceVectorData> trainData = context.getObj(KMeansTrainBatchOp.TRAIN_DATA);
        if (trainData == null) {
            return;
        }

        Arrays.fill(sumMatrixData, 0.0);
        for (FastDistanceVectorData sample : trainData) {
            KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance, distanceMatrix); }}}Copy the code

It can be seen from here that the imperative programming mode is used in ComputeFunction, which can best fit the current situation of programmers and greatly improve productivity.

7. CommunicateFunction

Add (new AllReduce(CENTROID_ALL_REDUCE)). This part of the code serves as a link between the past and the future. Before KMeansPreallocateCentroid, KMeansAssignCluster and subsequent KMeansUpdateCentroids made a reduce/broadcast communication through it.

As you can see from the annotations, AllReduce is an implementation of an MPI-related communication primitive. In this case, reduce/broadcast is applied to double[] object.

public class AllReduce extends CommunicateFunction {
	public static <T> DataSet <T> allReduce(
		DataSet <T> input,
		final String bufferName,
		final String lengthName,
		final SerializableBiConsumer <double[].double[]> op,
		final int sessionId) {
		final String transferBufferName = UUID.randomUUID().toString();

		return input
			.mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId))
			.withBroadcastSet(input, "barrier")
			.returns(
				new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
			.name("AllReduceSend")
			.partitionCustom(new Partitioner <Integer>() {
				@Override
				public int partition(Integer key, int numPartitions) {
					returnkey; }},0)
			.name("AllReduceBroadcastRaw")
			.mapPartition(new AllReduceSum(bufferName, lengthName, sessionId, op))
			.returns(
				new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
			.name("AllReduceSum")
			.partitionCustom(new Partitioner <Integer>() {
				@Override
				public int partition(Integer key, int numPartitions) {
					returnkey; }},0)
			.name("AllReduceBroadcastSum")
			.mapPartition(new AllReduceRecv <T>(bufferName, lengthName, sessionId))
			.returns(input.getType())
			.name("AllReduceRecv"); }}Copy the code

After debugging, we can see that AllReduceSum calls SUM in its own mapPartition implementation.

	/** * The all-reduce operation which does elementwise sum operation. */
	public final static SerializableBiConsumer <double[].double[]> SUM
		= new SerializableBiConsumer <double[].double[] > () {@Override
		public void accept(double[] a, double[] b) {
			for (int i = 0; i < a.length; ++i) { a[i] += b[i]; }}};private static class AllReduceSum extends RichMapPartitionFunction <Tuple3 <Integer.Integer.double[] >,Tuple3 <Integer.Integer.double[] > >{
		@Override
		public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values,
								 Collector <Tuple3 <Integer, Integer, double[]>> out) {

      // Omit various initialization operations, such as determining transport location, transport destination, etc.do {
				Tuple3 <Integer, Integer, double[]> val = it.next();
				int localPos = val.f1 - startPos;
				if (sum[localPos] == null) {
					sum[localPos] = val.f2;
					agg[localPos]++;
				} else {
          // SUM is calledop.accept(sum[localPos], val.f2); }}while (it.hasNext());

			for (int i = 0; i < numOfSubTasks; ++i) {
				for (int j = 0; j < cnt; ++j) {
					out.collect(Tuple3.of(i, startPos + j, sum[j]));
				}
			}
		}
	}

accept:129, AllReduce$3 (com.alibaba.alink.common.comqueue.communication)
accept:126, AllReduce$3 (com.alibaba.alink.common.comqueue.communication)
mapPartition:314, AllReduce$AllReduceSum (com.alibaba.alink.common.comqueue.communication)
run:103, MapPartitionDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
run:157, AbstractIterativeTask (org.apache.flink.runtime.iterative.task)
run:107, IterationIntermediateTask (org.apache.flink.runtime.iterative.task)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:705, Task (org.apache.flink.runtime.taskmanager)
run:530, Task (org.apache.flink.runtime.taskmanager)
run:745, Thread (java.lang)
Copy the code

0x06 Another way to play

So far, we have found that the iterative computing framework is well designed. But Alink doesn’t limit you to using this framework to implement algorithms. If you’re a Flink expert, you can do whatever you want.

The Alink example itself has one such implementation, ALSExample. Its core class, AlsTrainBatchOp, uses Flink operator and IterativeDataSet directly.

This is like wu-song-wu-du head, a pair of sword is turned into a sycophantic minister, bare-handed fist also hit the dead eye white forehead insect.

public final class AlsTrainBatchOp
    extends BatchOperator<AlsTrainBatchOp>
    implements AlsTrainParams<AlsTrainBatchOp> {

    @Override
    public AlsTrainBatchOp linkFrom(BatchOperator
       ... inputs) { BatchOperator<? > in = checkAndGetFirst(inputs); . AlsTrain als =new AlsTrain(rank, numIter, lambda, implicitPrefs, alpha, numMiniBatches, nonNegative);
        DataSet<Tuple3<Byte, Long, float[]>> factors = als.fit(alsInput);

        DataSet<Row> output = factors.mapPartition(new RichMapPartitionFunction<Tuple3<Byte, Long, float[]>, Row>() {
            @Override
            public void mapPartition(Iterable<Tuple3<Byte, Long, float[]>> values, Collector<Row> out) {
                newAlsModelDataConverter(userColName, itemColName).save(values, out); }});return this; }}Copy the code

For the record, Flink ML also has ALS, which is a Scala implementation. Algorithmic engineers without Scala experience will grind through the code.

0 x07 summary

After the speculation and verification of these two articles, we conclude as follows.

Part of Alink’s design principles

  • Algorithms belong to algorithms, Flink belongs to Flink, try to shield the connection between AI algorithm and Flink.

  • Adopt the simplest, most common development language and way of thinking.

  • Try to learn from common machine learning design ideas and development modes in the market, so that developers can switch seamlessly.

  • Build a middleware or adapter that blocks Flink, takes advantage of it, and allows users to quickly develop algorithms based on it.

Alink implements these principles

  • Top pipeline, Estimator, Transformer…
  • Algorithm component middle tier
  • The underlying iterative computing framework

In this way, Alink can maximize the advantages of Flink and adapt to the current situation, making the work of algorithm engineers more convenient. So as to achieve the double improvement of system performance and productivity.

The next article will try to introduce a concrete implementation of AllReduce.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

A brief analysis of k-means clustering algorithm

Implementation of Flink Kmeans clustering algorithm

Spark ML introduction to Pipeline, DataFrame, Estimator, Transformer

Open source | the world’s first batch of flow machine learning platform

GitHub 2000+ Star, Alink machine learning platform how to win double 11 data “game”? AI technology ecology

Flink DataSet API