Flink’s biggest highlight is real-time processing. Flink sees batch processing as a special case of streaming processing, where batch and streaming data can be processed through a single engine, and Flink will focus more resources on batch streaming fusion in the future. I introduced the use of the DataStream API in the Flink DataStream API programming guide. In this article, I will introduce the use of the Flink batch computing DataSet API. You can learn from this article:

  • DataSet Transformation
  • Use of Source and Sink
  • Broadcast variable basic concept and use Demo
  • Distributed cache concept and use Demo
  • The Transformation of the DataSet API uses a Demo case

WordCount sample

Before introducing the DataSet API, let’s take a look ata simple example of Word Count to intuitively understand the programming model of the DataSet API. The specific code is as follows:

public class WordCount {
    public static void main(String[] args) throws Exception {
        // The execution environment for batch processing
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        / / the data source
        DataSource<String> stringDataSource = env.fromElements("hello Flink What is Apache Flink");

        / / conversion
        AggregateOperator<Tuple2<String, Integer>> wordCnt = stringDataSource
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] split = value.split("");
                        for (String word : split) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .groupBy(0)
                .sum(1);
        / / outputwordCnt.print(); }}Copy the code

As you can see from the example above, the basic programming model is:

  • Gets the ExecutionEnvironment for batch processing
  • Loading a data source
  • Conversion operations
  • Data output

The data source, transformation operation, and data output are explained below.

Data Source

The DataSet API supports reading batch datasets from multiple data sources into the Flink system and converting them into DataSet. There are three main types: file – based, collection – based and generic class data sources. At the same time in the DataSet in the API can be custom InputFormat/RichInputFormat interface, with access to different types of data sources, data format such as CsvInputFormat, TextInputFormat, etc. You can see the supported data source methods from the methods provided by the ExecutionEnvironment class, as shown in the following figure:

File-based data source

readTextFile(path) / TextInputFormat

  • explain

Read the text file, pass the file path parameters, and convert the file content to DataSet type.

  • use
// Read local files
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// Read the HDSF file
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
Copy the code

readTextFileWithValue(path)/ TextValueInputFormat

  • explain

Read the text file content and convert the file content to DataSet of DataSet[StringValue] type. This method differs from readTextFile(String) in that its generic type is StringValue, which is a variable String type. Using StringValue to store text data can effectively reduce the number of String objects created and reduce the pressure of garbage collection.

  • use
// Read local files
DataSet<StringValue> localLines = env.readTextFileWithValue("file:///some/local/file");
// Read the HDSF file
DataSet<StringValue> hdfsLines = env.readTextFileWithValue("hdfs://host:port/file/path");
Copy the code

readCsvFile(path)/ CsvInputFormat

  • explain

Create a CSV reader to read comma-separated (or other delimiter) files. The DataSet can be directly converted to a Tuple or POJOs type. You can specify row cutters, column cutters, fields, and so on in a method.

  • use
// read a CSV file with five fields, taking only two of them
// Read a CSV file with 5 fields, take only the first and fourth fields
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  
	                          .types(String.class, Double.class);

// Read a CSV file with three fields and convert it to POJO type
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name"."age"."zipcode");
Copy the code

readFileOfPrimitives(path, Class) / PrimitiveInputFormat

  • explain

Read a file of the original DataSet type (e.g. String,Integer) and return a set of the corresponding original DataSet type

  • use
DataSet<String> Data = env.readFileOfPrimitives("file:///some/local/file", String.class);
Copy the code

Aggregation-based data sources

fromCollection(Collection)

  • explain

Create a DataSet DataSet from a Java collection with elements of the same data type

  • use
DataSet<String> data= env.fromCollection(arrayList);
Copy the code

fromElements(T …)

  • explain

Create a DataSet DataSet from a given sequence of data elements, and all data objects must be of the same type

  • use
 DataSet<String> stringDataSource = env.fromElements("hello Flink What is Apache Flink");
Copy the code

generateSequence(from, to)

  • explain

Specify the from to range interval, and then generate the numeric sequence data set inside the interval. Since it is processed in parallel, the final order cannot be guaranteed to be consistent.

  • use
DataSet<Long> longDataSource = env.generateSequence(1.20);
Copy the code

General-type data source

The DataSet API provides a common data interface of Inputformat to access data of different data sources and format types. The InputFormat interface is mainly divided into two types: one is based on the file type and corresponds to the readFile() method in the DataSet API; The other is an interface based on common data types, such as reading RDBMS or NoSQL databases, corresponding to the createInput() method in the DataSet API.

readFile(inputFormat, path) / FileInputFormat

  • explain

Customize input source of file type, read and convert specified format file to DataSet

  • use
env.readFile(new MyInputFormat(), "file:///some/local/file");
Copy the code

createInput(inputFormat) / InputFormat

  • explain

Customize a generic data source to convert the read data to a DataSet. The following example uses the built-in JDBCInputFormat of Flink to create a JDBCInputFormat that reads the mysql data source, complete the reading of the Person table from mysql, and convert the DataSet [Row]

  • use
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
                     .setDrivername("com.mysql.jdbc.Driver")
                     .setDBUrl("jdbc:mysql://localhost/mydb")
                     .setQuery("select name, age from stu")
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                     .finish()
    );
Copy the code

Data Sink

The data output of Flink in DataSet API can be divided into three types. The first is a file-based implementation, corresponding to the write() method of the DataSet, which outputs the DataSet data to the file system. The second is based on the implementation of common storage media, corresponding to the output() method of DataSet, such as JDBCOutputFormat to output data to a relational database. The last is client-side output, which directly collects DataSet data from different nodes to the Client and outputs it in the Client, such as the print() method of the DataSet.

Standard data output methods

// Text data
DataSet<String> textData = / / [...].

// Write data to a local file
textData.writeAsText("file:///my/result/on/localFS");

// Write data to the HDFS file
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// Write data to a local file, overwrite if the file exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

/ / the data output to the local CSV file, specify the separator for the "|"
DataSet<Tuple3<String, Integer, Double>> values = / / [...].
values.writeAsCsv("file:///path/to/the/result/file"."\n"."|");

// Use a custom TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file".new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + "-"+ value.f0; }});Copy the code

Use custom output types

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// Write data of type tuple to relational database
myResult.output(
    // Create and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("com.mysql.jdbc.Driver")
                    .setDBUrl("jdbc:mysql://localhost/mydb")
                    .setQuery("insert into persons (name, age, height) values (? ,? ,?) ")
                    .finish()
    );
Copy the code

The DataSet conversion

You can perform transformations from one DataSet to another, or you can make transformations easily. Flink provides a wealth of transformations. Specific use is as follows:

Map

When you come into a

  DataSource<String> source = env.fromElements("I"."like"."flink");
        source.map(new MapFunction<String, String>() {
            @Override
            // Uppercase the data
            public String map(String value) throws Exception {
                return value.toUpperCase();
            }
        }).print();
Copy the code

FlatMap

Input an element to produce zero, one, or more elements

stringDataSource
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] split = value.split("");
                        for (String word : split) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .groupBy(0)
                .sum(1);
Copy the code

MapPartition

The function is similar to Map function, except that MapPartition operation processes data based on partition in DataSet. In function call, data will be passed in the form of Iteator according to partition. The number of elements in each partition is related to parallelism, and any number of result values will be returned.

 source.mapPartition(new MapPartitionFunction<String, Long>() {
            @Override
            public void mapPartition(Iterable<String> values, Collector<Long> out) throws Exception {
                long c = 0;
                for (String value : values) {
                    c++;
                }
                // Prints the number of elements per partition
                out.collect(c);
            }
        }).print();
Copy the code

Filter

Filter data, retaining it if true or filtering it out if false

DataSource<Long> source = env.fromElements(1L.2L.3L.4L.5L);
        source.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value % 2= =0;
            }
        }).print();
Copy the code

Project

Can only be used in Tuple data sets, projection operations, select a subset of the Tuple data field

  DataSource<Tuple3<Long, Integer, String>> source = env.fromElements(
                Tuple3.of(1L.20."tom"), 
                Tuple3.of(2L.25."jack"), 
                Tuple3.of(3L.22."bob"));
        // Go to the first and third elements
        source.project(0.2).print();

Copy the code

Reduce

To combine elements of a data set into a single element by pair merging, which can be used on the entire data set or on the grouped data set.

DataSource<Tuple2<String, Integer>> source = env.fromElements(
                Tuple2.of("Flink".1),
                Tuple2.of("Flink".1),
                Tuple2.of("Hadoop".1),
                Tuple2.of("Spark".1),
                Tuple2.of("Flink".1));
        source
                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        }).print();

Copy the code

ReduceGroup

To combine elements of a dataset into a single element, which can be used on the entire dataset or on the grouped dataset. The input value to the reduce function is an Iterable of grouped elements.

DataSource<Tuple2<String, Long>> source = env.fromElements(
                Tuple2.of("Flink".1L),
                Tuple2.of("Flink".1L),
                Tuple2.of("Hadoop".1L),
                Tuple2.of("Spark".1L),
                Tuple2.of("Flink".1L));
        source
                .groupBy(0)
                .reduceGroup(new GroupReduceFunction<Tuple2<String,Long>, Tuple2<String,Long>>() {
                    @Override
                    public void reduce(Iterable<Tuple2<String, Long>> values, Collector<Tuple2<String, Long>> out) throws Exception {
                        Long sum = 0L;
                        String word = "";
                        for(Tuple2<String, Long> value:values){
                            sum += value.f1;
                            word = value.f0;

                        }
                        out.collect(Tuple2.of(word,sum));
                    }
                }).print();

Copy the code

Aggregate

Aggregate Function is used to combine a set of element values into a single value, which can be used on the entire DataSet or on subsequent datasets. Used only on Tuple data sets, including Sum,Min,Max functions

DataSource<Tuple2<String, Long>> source = env.fromElements(
                Tuple2.of("Flink".1L),
                Tuple2.of("Flink".1L),
                Tuple2.of("Hadoop".1L),
                Tuple2.of("Spark".1L),
                Tuple2.of("Flink".1L));
        source
                .groupBy(0)
                .aggregate(SUM,1)// Add the second value
                 .print();

Copy the code

Distinct

DataSet DataSet element deduplication

DataSource<Tuple> source = env.fromElements(Tuple1.of("Flink"),Tuple1.of("Flink"),Tuple1.of("hadoop"));
        source.distinct(0).print();// Delete the first field of the tuple
/ / the result:
(Flink)
(hadoop)

Copy the code

Join

The default join is a DataSet that produces a Tuple2 DataSet. The associated key can be specified by key expression, key-selector function, field position, and CaseClass field. Two Tuple cubes can be associated by field positions, with the fields in the left cube specified by the WHERE method and the fields in the right cube specified by the equalTo() method. Such as:

DataSource<Tuple2<Integer,String>> source1 = env.fromElements(
                Tuple2.of(1."jack"),
                Tuple2.of(2."tom"),
                Tuple2.of(3."Bob"));
        DataSource<Tuple2<String, Integer>> source2 = env.fromElements(
                Tuple2.of("order1".1),
                Tuple2.of("order2".2),
                Tuple2.of("order3".3));
        source1.join(source2).where(0).equalTo(1).print();

Copy the code

A custom Join Funciton can be specified in the process of association. The input parameter of Funciton is a meta-ancestor composed of data elements in the data set on the left and data elements in the data set on the right, and returns a data after calculation and processing. Such as:

// User ID, purchased item name, purchased item quantity
        DataSource<Tuple3<Integer,String,Integer>> source1 = env.fromElements(
                Tuple3.of(1."item1".2),
                Tuple3.of(2."item2".3),
                Tuple3.of(3."item3".4));
        // Product name and unit price
        DataSource<Tuple2<String, Integer>> source2 = env.fromElements(
                Tuple2.of("item1".10),
                Tuple2.of("item2".20),
                Tuple2.of("item3".15));
        source1.join(source2)
                .where(1)
                .equalTo(0)
                .with(new JoinFunction<Tuple3<Integer,String,Integer>, Tuple2<String,Integer>, Tuple3<Integer,String,Double>>() {
                    // The user buys the total amount of each item
                    @Override
                    public Tuple3<Integer, String, Double> join(Tuple3<Integer, String, Integer> first, Tuple2<String, Integer> second) throws Exception {
                        return Tuple3.of(first.f0,first.f1,first.f2 * second.f1.doubleValue());
                    }
                }).print();

Copy the code

In order to better guide the bottom layer of Flink to correctly process data sets, the Size Hint can be used to mark the Size of data sets in the association of DataSet, and Flink can adjust the calculation strategy according to the Hint given by the user. For example, you can use joinWithTiny or joinWithHuge to indicate the size of the second dataset. The following is an example:

DataSet<Tuple2<Integer, String>> input1 = / / [...].
DataSet<Tuple2<Integer, String>> input2 = / / [...].

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
            result1 =
            // The second dataset is a small dataset
            input1.joinWithTiny(input2)
                  .where(0)
                  .equalTo(0);

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
            result2 =
            // h indicates that the second dataset is a large dataset
            input1.joinWithHuge(input2)
                  .where(0)
                  .equalTo(0);

Copy the code

Flink’s Runtime can perform joins in a variety of ways. In different situations, every possible approach trumps the others. The system tries to automatically select a reasonable method, but allowing users to manually select a policy makes Flink more flexible and efficient in performing Join operations.

DataSet<SomeType> input1 = / / [...].
DataSet<AnotherType> input2 = / / [...].
// Broadcast the first input and build a hash table from it, which will be probed by the second input, suitable for scenarios where the first dataset is very small
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");
// Broadcast the second input and build a hash table from it, which will be probed by the first input, suitable for scenarios where the second dataset is very small
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_SECOND)
            .where("id").equalTo("key");
// Repartition the two datasets and convert the first dataset into a hash table, suitable for scenarios where the first dataset is smaller than the second, but both datasets are relatively large
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.REPARTITION_HASH_FIRST)
            .where("id").equalTo("key");
// Repartition the two datasets and convert the second dataset into a hash table, suitable for scenarios where the second dataset is smaller than the first, but both datasets are relatively large
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.REPARTITION_HASH_SECOND)
            .where("id").equalTo("key");
// Repartition the two datasets and sort each partition. This applies to scenarios where both datasets are already sorted
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.REPARTITION_SORT_MERGE)
            .where("id").equalTo("key");
// It is equivalent to not specifying, and the system handles it by itself
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.OPTIMIZER_CHOOSES)
            .where("id").equalTo("key");

Copy the code

OuterJoin

OuterJoin associates two datasets, including left, right, and full outer join, corresponding to leftOuterJoin, rightOuterJoin, and fullOuterJoin methods in DataSet API respectively. Note that external concatenation applies only to the Java and Scala DataSet apis.

The usage is almost similar to join:

// left outer join
source1.leftOuterJoin(source2).where(1).equalTo(0);
// right outer link
source1.rightOuterJoin(source2).where(1).equalTo(0);

Copy the code

In addition, the outer link also provides corresponding association algorithm hints, which can select appropriate optimization strategies according to the distribution of left and right data sets to improve the efficiency of data processing. The following code can be interpreted in the above join.

DataSet<SomeType> input1 = / / [...].
DataSet<AnotherType> input2 = / / [...].
DataSet<Tuple2<SomeType, AnotherType> result1 =
      input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
            .where("id").equalTo("key");

DataSet<Tuple2<SomeType, AnotherType> result2 =
      input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");

Copy the code

The association algorithm of external join is different from join. Each external connection supports only part of the algorithm. As follows:

  • LeftOuterJoin support:
    • OPTIMIZER_CHOOSES
    • BROADCAST_HASH_SECOND
    • REPARTITION_HASH_SECOND
    • REPARTITION_SORT_MERGE
    • RightOuterJoin support:
      • OPTIMIZER_CHOOSES
      • BROADCAST_HASH_FIRST
      • REPARTITION_HASH_FIRST
      • REPARTITION_SORT_MERGE
    • FullOuterJoin support:
      • OPTIMIZER_CHOOSES
      • REPARTITION_SORT_MERGE

CoGroup

CoGroup is the operation to join the grouped DataSet. The two DataSet data sets are joined together. Each DataSet is first grouped according to key, and then the grouped DataSet is transferred to the user-defined CoGroupFunction. Combine two datasets based on the same Key record. Records with the same Key are stored in a Group. If you specify that the Key has records in only one dataset, the Co-group function associates the Group with the empty Group.

// User ID, purchased item name, purchased item quantity
        DataSource<Tuple3<Integer,String,Integer>> source1 = env.fromElements(
                Tuple3.of(1."item1".2),
                Tuple3.of(2."item2".3),
                Tuple3.of(3."item2".4));
        // Product name and unit price
        DataSource<Tuple2<String, Integer>> source2 = env.fromElements(
                Tuple2.of("item1".10),
                Tuple2.of("item2".20),
                Tuple2.of("item3".15));

        source1.coGroup(source2)
                .where(1)
                .equalTo(0)
                .with(new CoGroupFunction<Tuple3<Integer,String,Integer>, Tuple2<String,Integer>, Tuple2<String,Double>>() {
                    // Each Iterable stores grouped data, i.e. data with the same key grouped together
                    @Override
                    public void coGroup(Iterable<Tuple3<Integer, String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<Tuple2<String, Double>> out) throws Exception {
                        // Store the purchase quantity of each item
                        int sum = 0;
                        for(Tuple3<Integer, String, Integer> val1:first){
                        sum += val1.f2;

                    }
                    // Quantity of each item * unit price
                    for(Tuple2<String, Integer> val2:second){
                        out.collect(Tuple2.of(val2.f0,sum * val2.f1.doubleValue()));

                        }
                    }
                }).print();

Copy the code

Cross

Combine two data sets into a data set, and return the Cartesian product of all data rows of the two connected data sets. The number of returned data rows is equal to the number of data rows in the first data set that meet the query conditions multiplied by the number of data rows in the second data set that meet the query conditions. The Cross operation can combine the associated data set into a data set of the target format by applying Cross Funciton, or return a data set of type Tuple2 if Cross Funciton is not specified. The Cross operation is computation-intensive and it is recommended to use it with algorithm hints such as crossWithTiny() and crossWithHuge().

/ / / id, x, y, coordinate values
        DataSet<Tuple3<Integer, Integer, Integer>> coords1 = env.fromElements(
                Tuple3.of(1.20.18),
                Tuple3.of(2.15.20),
                Tuple3.of(3.25.10));
        DataSet<Tuple3<Integer, Integer, Integer>> coords2 = env.fromElements(
                Tuple3.of(1.20.18),
                Tuple3.of(2.15.20),
                Tuple3.of(3.25.10));
        // Find the Euclidean distance between any two points

        coords1.cross(coords2)
                .with(new CrossFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Double>>() {
                    @Override
                    public Tuple3<Integer, Integer, Double> cross(Tuple3<Integer, Integer, Integer> val1, Tuple3<Integer, Integer, Integer> val2) throws Exception {
                        // Calculate Euclidean distance
                        double dist = sqrt(pow(val1.f1 - val2.f1, 2) + pow(val1.f2 - val2.f2, 2));
                        // Return the Euclidean distance between two points
                        return Tuple3.of(val1.f0,val2.f0,dist);
                    }
                }).print();

Copy the code

Union

Merge two DataSet datasets. Data element formats of the two datasets must be the same. Multiple datasets can be continuously merged.

     DataSet<Tuple2<String, Integer>> vals1 = env.fromElements(
                Tuple2.of("jack".20),
                Tuple2.of("Tom".21));
        DataSet<Tuple2<String, Integer>> vals2 = env.fromElements(
                Tuple2.of("Robin".25),
                Tuple2.of("Bob".30));
        DataSet<Tuple2<String, Integer>> vals3 = env.fromElements(
                Tuple2.of("Jasper".24),
                Tuple2.of("jarry".21));
        DataSet<Tuple2<String, Integer>> unioned = vals1
                .union(vals2)
                .union(vals3);
        unioned.print();

Copy the code

Rebalance

Rebalance all data in the dataset so that each partition has the same amount of data. This will reduce the impact of data skewness. Note that only map-like operators (such as Map and flatMap) can be used after Rebalance.

DataSet<String> in = / / [...].
Rebalance DataSet, then use the map operator.
DataSet<Tuple2<String, String>> out = in.rebalance()
                                        .map(new Mapper());

Copy the code

Hash-Partition

Hash partitions are performed based on the given Key. Data with the same Key is stored in the same partition. You can specify a key using the location of the element, the name of the element, or the key selector function.

DataSet<Tuple2<String, Integer>> in = / / [...].
// Hash partition based on the first value, then use MapPartition conversion.
DataSet<Tuple2<String, String>> out = in.partitionByHash(0)
                                        .mapPartition(new PartitionMapper());

Copy the code

Range-Partition

Range partition based on a given Key. Data with the same Key will be placed in the same partition. You can specify a key using the location of the element, the name of the element, or the key selector function.

DataSet<Tuple2<String, Integer>> in = / / [...].
// Range partition based on the first value, then MapPartition conversion operation.
DataSet<Tuple2<String, String>> out = in.partitionByRange(0)
                                        .mapPartition(new PartitionMapper());

Copy the code

Custom Partitioning

In addition to the partitioning above, custom partitioning functions are supported.

DataSet<Tuple2<String,Integer>> in = / / [...].
DataSet<Integer> result = in.partitionCustom(partitioner, key)
                            .mapPartition(new PartitionMapper());

Copy the code

Sort Partition

Locally resorts all partitions in the DataSet according to the specified fields. The ordering mode is specified by the ord. ASCENDING and orD. DESCENDING keywords. You can specify multiple fields to sort partitions as follows:

DataSet<Tuple2<String, Integer>> in = / / [...].
// The first field is sorted in ascending order and the second field is sorted in descending order.
DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
                                        .sortPartition(0, Order.DESCENDING)
                                        .mapPartition(new PartitionMapper());

Copy the code

First-n

Returns n random results from the Grouped datasets, which can be applied to general-type datasets, Grouped datasets, and sorted datasets.

DataSet<Tuple2<String, Integer>> in = / / [...].
// Returns any 5 elements of the dataset
DataSet<Tuple2<String, Integer>> out1 = in.first(5);
// Returns any two elements within each group
DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
                                          .first(2);
// Return the first three elements within each group
// The grouped dataset is sorted in ascending order by the second field
DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
                                          .sortGroup(1, Order.ASCENDING)
                                          .first(3);

Copy the code

MinBy / MaxBy

Returns the smallest or largest record for a specified field or combination from the dataset, or a random record from the collection if the selected field has multiple identical values.

DataSet<Tuple2<String, Integer>> source = env.fromElements(
                Tuple2.of("jack".20),
                Tuple2.of("Tom".21),
                Tuple2.of("Robin".25),
                Tuple2.of("Bob".30));
// Compare the second element to find the tuple whose second element is the smallest
// Use minBy on the entire DataSet
ReduceOperator<Tuple2<String, Integer>> tuple2Reduce = source.minBy(1);
tuple2Reduce.print();/ / return (20), jack,

// You can also use minBy on the grouped DataSet
source.groupBy(0) // Group by the first field
      .minBy(1)  // Find the tuple in each group whose value is the minimum of the second element
      .print();


Copy the code

Radio variable

The basic concept

Broadcast variable is a data sharing method commonly used in distributed computing framework. Its main role is to small data sets with the method of network transmission, on each machine maintain a read-only cache variable, the compute node instances were broadcast can be read directly in the local memory data sets, it can avoid many times in the process of data from other nodes by means of remote reading small data set, thereby improve the overall task computing performance.

The broadcast variable can be understood as a common shared variable that can broadcast the DataSet so that different tasks can read the data. Only one copy of the broadcast data is stored on each node. If broadcast variables are not used, a dataset dataset will be copied in tasks on each node, resulting in wasted memory.

The basic steps for using broadcast variables are as follows:

// The first step is to create the data set to broadcast
DataSet<Integer> toBroadcast = env.fromElements(1.2.3);

DataSet<String> data = env.fromElements("a"."b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // The third step is to access the broadcast variable data set as a collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }
    @Override
    public String map(String value) throws Exception {... } }).withBroadcastSet(toBroadcast,"broadcastSetName"); // Step 2 broadcast the data set

Copy the code

As you can see from the above code, the DataSet API supports reading broadcast variables through RuntimeContext in the RichFunction interface.

We first implement the Open() method in RichFunction, then call getRuntimeContext() to get the application’s RuntimeContext, and then call getBroadcastVariable() to get the broadcast variable by broadcast name. Meanwhile, Flink directly converts data sets to local Collection through the collect operation. It is important to note that the data type of the Collection object must be the same as the type of the defined dataset, otherwise conversion problems will occur.

Matters needing attention:

  • Because the contents of broadcast variables are stored in memory on each node, the broadcast variable dataset is not easy to grow too large.
  • After the broadcast variable is initialized, it cannot be modified to ensure that the data on each node is the same.
  • If multiple operators are going to use the same data set, the broadcast variables need to be registered separately after the multiple operators.
  • Broadcast variables can only be used in batch processing.

Use the Demo

public class BroadcastExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer,String>> RawBroadCastData = new ArrayList<>();

        RawBroadCastData.add(new Tuple2<>(1."jack"));
        RawBroadCastData.add(new Tuple2<>(2."tom"));
        RawBroadCastData.add(new Tuple2<>(3."Bob"));

        [userId,userName]
        DataSource<Tuple2<Integer, String>> userInfoBroadCastData = env.fromCollection(RawBroadCastData);

        ArrayList<Tuple2<Integer,Double>> rawUserAount = new ArrayList<>();

        rawUserAount.add(new Tuple2<>(1.1000.00));
        rawUserAount.add(new Tuple2<>(2.500.20));
        rawUserAount.add(new Tuple2<>(3.800.50));

        // process data: UserId, user purchase amount, [UserId,amount]
        DataSet<Tuple2<Integer, Double>> userAmount = env.fromCollection(rawUserAount);

        // Convert the DataSet to the map collection type
        DataSet<HashMap<Integer, String>> userInfoBroadCast = userInfoBroadCastData.map(new MapFunction<Tuple2<Integer, String>, HashMap<Integer, String>>() {

            @Override
            public HashMap<Integer, String> map(Tuple2<Integer, String> value) throws Exception {
                HashMap<Integer, String> userInfo = new HashMap<>();
                userInfo.put(value.f0, value.f1);
                returnuserInfo; }}); DataSet<String> result = userAmount.map(new RichMapFunction<Tuple2<Integer, Double>, String>() {
            // Store the set of data returned by the broadcast variable
            List<HashMap<String, String>> broadCastList = new ArrayList<>();
            // Store the value of the broadcast variable
            HashMap<String, String> allMap = new HashMap<>();

            @Override

            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // Get broadcast data, return a list
                this.broadCastList = getRuntimeContext().getBroadcastVariable("userInfo");
                for(HashMap<String, String> value : broadCastList) { allMap.putAll(value); }}@Override
            public String map(Tuple2<Integer, Double> value) throws Exception {
                String userName = allMap.get(value.f0);
                return "User ID:" + value.f0 + "|"+ "Username:" + userName + "|" + "Purchase Amount:" + value.f1;
            }
        }).withBroadcastSet(userInfoBroadCast, "userInfo"); result.print(); }}Copy the code

Distributed cache

The basic concept

Flink provides a distributed cache, similar to Hadoop, so that files can be accessed locally by parallel instances of user functions. Distributed caching works by registering a file or directory (local or remote file system, such as HDFS) for a program, registering a cache file through ExecutionEnvironment, and giving it an alias. When the application is executed, Flink automatically copies the registered file or directory to the local file system of all TaskManager nodes. Users can look up the file or directory using the alias they registered, and then access the file on the TaskManager node’s local file system.

How to use the distributed cache:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Register an HDFS file
env.registerCachedFile("hdfs:///path/to/your/file"."hdfsFile")
// Register a local file
env.registerCachedFile("file:///path/to/exec/file"."localExecFile".true)
// Access data
getRuntimeContext().getDistributedCache().getFile("hdfsFile");


Copy the code

The method of retrieving cache files is similar to that of broadcasting variables. It implements the RichFunction interface and retrieves the RuntimeContext object through the RichFunction interface, and then retrieves the corresponding local cache file through the interface provided by RuntimeContext.

Use the Demo

public class DistributeCacheExample {
    public static void main(String[] args) throws Exception {
        // Get the operating environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        / * * * * to register a local files file contents as follows: * 1, * 2 "jack", "Tom" * 3, "Bob" * /
        env.registerCachedFile("file:///E://userinfo.txt"."localFileUserInfo".true);

        ArrayList<Tuple2<Integer,Double>> rawUserAount = new ArrayList<>();

        rawUserAount.add(new Tuple2<>(1.1000.00));
        rawUserAount.add(new Tuple2<>(2.500.20));
        rawUserAount.add(new Tuple2<>(3.800.50));

        // process data: UserId, user purchase amount, [UserId,amount]
        DataSet<Tuple2<Integer, Double>> userAmount = env.fromCollection(rawUserAount);

        DataSet<String> result= userAmount.map(new RichMapFunction<Tuple2<Integer, Double>, String>() {
            // Save cached data
            HashMap<String, String> allMap = new HashMap<String, String>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // Get distributed cached data
                File userInfoFile = getRuntimeContext().getDistributedCache().getFile("localFileUserInfo");
                List<String> userInfo = FileUtils.readLines(userInfoFile);
                for (String value : userInfo) {

                    String[] split = value.split(",");
                    allMap.put(split[0], split[1]); }}@Override
            public String map(Tuple2<Integer, Double> value) throws Exception {
                String userName = allMap.get(value.f0);

                return "User ID:" + value.f0 + "|" + "Username:" + userName + "|" + "Purchase Amount:"+ value.f1; }}); result.print(); }}Copy the code

summary

This paper mainly explains the basic use of Flink DataSet API. This paper first introduces a DataSet API WordCount case, then introduces the DataSet API data source and Sink operation, and basic use. Then each conversion operation is explained in detail and a specific use case is given. Finally, the concepts of broadcast variables and distributed caching are explained, and a complete Demo case is provided on how to use these two advanced features.

The public account “Big Data Technology and Data Warehouse” replies to “information” to receive the big data data package