1. The Flink person

1. The Flink overview

What is the Flink

  • Apache Flink – Stateful Computations over Data Streams
  • Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

Unbounded vs Bounded data

  • Simply put, Flink is a distributed flow processing framework that can efficiently process both bounded and unbounded data streams.
  • The core of Flink is stream processing, but it can also support batch processing, which Flink regards as a special case of stream processing, i.e., data flow with clear boundaries. This is in complete contrast to the idea of Spark Streaming, whose core is batch processing, which considers stream processing as a special case of batch processing, that is, to split data streams into micro-batches with minimal granularity.

  • All can be processed using Flink, corresponding to stream processing and batch processing.

2. Flink Layered API

  • Flink adopts a layered architecture design to ensure that the functions and responsibilities of each layer are clear.

  • SQL & Table API
    • The SQL & Table API is suitable for both batch and stream processing, which means you can query bounded and unbounded streams with the same semantics and produce the same results. In addition to basic queries, it also supports custom scalar functions, aggregate functions and table-valued functions, which can meet a variety of query requirements.
  • DataStream & DataSet API
    • DataStream & DataSet API is the core API of Flink data processing. It can be called using Java or Scala language and encapsulates a series of common operations such as data reading, data conversion, and data output.
  • Stateful Stream Processing
    • Stateful Stream Processing is the lowest level of abstraction, which is embedded into the DataStream API through the Process Function. Process Function is the lowest level API provided by Flink and has the greatest flexibility, allowing developers fine-grained control over time and state.

2. Develop your first Flink application quickly

1. Prepare the development environment

  • JDK
  • Maven

2. Develop a batch application using Flink

Requirements describe

  • Word Frequency Statistics
    • A file, counting the number of occurrences of each word in the file
    • The delimiter is \t
    • We directly print the statistical results on the console (production must be Sink to destination)

Flink + Java

  • * The only requirements are working Maven 3.0.4 (or higher) and Java 8.x Installations.
  • Create a way
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ - DarchetypeVersion = 1.8.0 comes withCopy the code
  • Development process/development eight-essay programming
    • set up the batch execution environment
    • read
    • Transform Operations: Developing business logic
    • execute program
  • Function and dismantling
    • Read the data
    • Each row of data is split according to the specified delimiter
    • Assign a degree of 1 to each word
    • merge
/** * Created by THPFFCJ on 2019-06-28. */ public class BatchWCJavaApp {public static void  main(String[] args) throws Exception { String input ="file:///Users/thpffcj/Public/data/hello.txt"; / / step1: access runtime environment ExecutionEnvironment env = ExecutionEnvironment. GetExecutionEnvironment (); / / step2:readdata DataSource<String> text = env.readTextFile(input); / / step3: transform text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = value.toLowerCase().split("\t");
                for (String token : tokens) {
                    if(token.length() > 0) { collector.collect(new Tuple2<String, Integer>(token, 1)); } } } }).groupBy(0).sum(1).print(); // step4:}Copy the code

3. Develop a real-time processing application using Flink

Flink + Scala

/** * Created by THPFFCJ on 2019-06-29. */ StreamingWCScalaApp {def main(args: Array[String]): Unit = {/ / step1: obtain the execution environment val env = StreamExecutionEnvironment. GetExecutionEnvironment / / step2: Val text = env.sockettExtStream ("localhost"., 9999), the import org. Apache. Flink API. Scala. _ / / step3: transform text. FlatMap (_. Split (","))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1).print()

    env.execute("StreamingWCScalaApp")}}Copy the code

3. Flink programming model and core concepts

1. DataSet & DataStream

  • DataSet: batch processing. Its interface encapsulates Spark’s DataSet and supports various functional operations, such as Map, fliter, join, and cogroup.
  • DataStram: Stream processing, whose architecture encapsulates the processing of input streams, also implements rich function support.
  • The implementation of DataSet is in Flink-Javamodule, while that of DataStream is in Flink-Streaming-Java.

2. Flink programming model

  • Anatomy of a Flink Program

3. System architecture

Flink requires four different components: JobManager, ResourceManager, TaskManager and Dispatcher

  • JobManagers (also known as masters) : The JobManagers receive an executor from the Dispatcher containing the JobGraph, The Logical Dataflow Graph with all its classes files, third-party libraries, and so on. JobManagers then convert JobGraph to an ExecutionGraph and apply for resources from ResourceManager to execute the task. Once resources are obtained, the ExecutionGraph is distributed to TaskManagers. So every Job has at least one JobManager; In a high availability deployment, there can be multiple JobManagers, one of which is the leader and the others are standby.
  • ResourceManager: Manages slots and coordinates cluster resources. ResourceManager receives resource requests from JobManager and allocates TaskManagers with slots available to JobManager to perform tasks. Flink provides different resource managers based on different deployment platforms such as YARN, Mesos, K8s, etc. When TaskManagers do not have enough slots to perform tasks, it will initiate a session to a third party platform to request additional resources.
  • TaskManagers (also known as workers) : TaskManagers are responsible for the execution of actual subtasks. Each taskmanager has a number of slots. Slot is a set of resources (such as computing power, storage space) of a fixed size. After TaskManagers starts, it registers its slots with ResourceManager. ResourceManager manages the slots in a unified manner.
  • Dispatcher: Receives the execution submitted by the client and passes it to JobManager. In addition, it provides a WEB UI interface for monitoring the execution of jobs.

Task execution

A TaskManager allows multiple tasks to be executed simultaneously. These tasks can belong to one operator (data parallelism), different operators (task parallelism), or even from different applications (job parallelism).

  • In the figure above, the maximum parallelism of the operator is 4, so at least 4 processing slots are needed to execute the application.
  • Scheduling tasks in slices to a processing slot has one advantage: Multiple tasks in TaskManager can efficiently exchange data within the same process without needing to access the network.

High availability Settings

  • To recover from a failure, the system first restarts the failed process, then restarts the application and restores its state.

4. Use of basic API of DataSet

1. DataSet API development overview

The simplest general data processing flow should be easy to imagine, which is to have a data input, Source; A data processing, Transform; Another is the data output, which is Sink.

2. DataSource

  • Based on the file
  • Based on the collection

Create the dataset from the collection

  • Scala implementation
object DataSetDataSourceApp {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    fromCollection(env)
  }

  def fromCollection(env: ExecutionEnvironment): Unit = {
    import org.apache.flink.api.scala._
    val data = 1 to 10
    env.fromCollection(data).print()
  }
}
Copy the code
  • Java implementation
public class JavaDataSetDataSourceApp {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        fromCollection(env);
    }

    public static void fromCollection(ExecutionEnvironment env) throws Exception {
        List<Integer> list = new ArrayList<>();
        for(int i = 1; i <= 10; i++) { list.add(i); } env.fromCollection(list).print(); }}Copy the code

Create the dataset from the file

  • Scala implementation
def textFile(env: ExecutionEnvironment): Unit = {
  val filePath = "file:///Users/thpffcj/Public/data/hello.txt"
  env.readTextFile(filePath).print()
}
Copy the code
  • Java implementation
public static void textFile(ExecutionEnvironment env) throws Exception {
    String filePath = "file:///Users/thpffcj/Public/data/hello.txt";
    env.readTextFile(filePath).print();
}
Copy the code

Create dataset from CSV file

case class MyCaseClass(name:String, age:Int)

def csvFile(env: ExecutionEnvironment): Unit = {

  import org.apache.flink.api.scala._
  val filePath = "file:///Users/thpffcj/Public/data/people.csv"

  env.readCsvFile[(String, Int, String)](filePath, ignoreFirstLine = true).print()
  env.readCsvFile[(String, Int)](filePath, ignoreFirstLine = true, includedFields = Array(0, 1)).print()
  env.readCsvFile[MyCaseClass](filePath, ignoreFirstLine = true, includedFields = Array(0, 1)).print()
  env.readCsvFile[Person](filePath, ignoreFirstLine = true, pojoFields = Array("name"."age"."work")).print()
}
Copy the code

Create dataset from recursive folder

def readRecursiveFiles(env: ExecutionEnvironment): Unit = {
  val filePath = "file:///Users/thpffcj/Public/data/nested"
  val parameters = new Configuration()
  parameters.setBoolean("recursive.file.enumeration".true)
  env.readTextFile(filePath).withParameters(parameters).print()
}
Copy the code

3. Transformation

  • Data conversion converts one or more datasets to a new DataSet

Map

A Map can be interpreted as a mapping. After certain transformations are performed on each element, it is mapped to another element.

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    List<Integer> list = new ArrayList<>();
    for (int i = 1; i <= 10; i++) {
        list.add(i);
    }
    DataSource<Integer> data = env.fromCollection(list);

    data.map(new MapFunction<Integer, Integer>() {
        @Override
        public Integer map(Integer input) throws Exception {
            return input + 1;
        }
    }).print();
}
Copy the code

Filter

The Filter transform applies a user-defined Filter function to each element of the DataSet and retains only elements that the function returns true.

data.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer input) throws Exception {
        return input + 1;
    }
}).filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer input) throws Exception {
        return input > 5;
    }
}).print();
Copy the code

FlatMap

A FlatMap can be understood as an amortization of elements, and each element can have zero, one, or more elements.

def flatMapFunction(env: ExecutionEnvironment): Unit = {
  val info = ListBuffer[String]()
  info.append("hadoop,spark")
  info.append("hadoop,flink")
  info.append("flink,flink")

  val data = env.fromCollection(info)
  data.map(_.split(",")).print()
  data.flatMap(_.split(",")).print()
  data.flatMap(_.split(",")).map((_, 1)).groupBy(0).sum(1).print()
}
Copy the code

Distinct

Returns the different elements of the dataset.

def distinctFunction(env: ExecutionEnvironment): Unit = {
  val info = ListBuffer[String]()
  info.append("hadoop,spark")
  info.append("hadoop,flink")
  info.append("flink,flink")

  val data = env.fromCollection(info)

  data.flatMap(_.split(",")).distinct().print()
}
Copy the code

Join

Joins two data sets by creating all pairs of elements that are equal on their keys.

Def joinFunction(env: ExecutionEnvironment): Unit = {Int info1 = ListBuffer[(Int, String)]()"Thpffcj1"))
  info1.append((2, "Thpffcj2"))
  info1.append((3, "Thpffcj3"))
  info1.append((4, "Thpffcj4")) val infO2 = ListBuffer[(Int, String)]() // Number city info2.appEnd ((1,"Nanjing"))
  info2.append((2, "Beijing"))
  info2.append((3, "Shanghai"))
  info2.append((5, "Chengdu"))

  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)

  data1.join(data2).where(0).equalTo(0).apply((first, second) => {
    (first._1, first._2, second._2)
  }).print()
}
Copy the code

4. Sink

Data sinks consume data sets, and data sink operations are described in OutputFormat.

object DataSetSinkApp {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val data = 1.to(10)
    val text = env.fromCollection(data)

    val filePath = "file:///Users/thpffcj/Public/data/sink-out"
    text.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(2)

    env.execute("DataSetSinkApp")}}Copy the code

5. Use DataStream basic apis

1. Data Source

Socket-based

object DataStreamSourceApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    socketFunction(env)

    env.execute("DataStreamSourceApp")
  }

  def socketFunction(env: StreamExecutionEnvironment): Unit = {

    val data = env.socketTextStream("localhost", 9999)
    data.print()
  }
}
Copy the code
  • We use Netcat to send data
nc -lk 9999
Copy the code

Custom: Implements Custom data sources

  • Method 1: Implementing the SourceFunction for non-parallel sources
class CustomNonParallelSourceFunction extends SourceFunction[Long]{

  var count = 1L
  var isRunning = true

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    while (isRunning) {
      ctx.collect(count)
      count += 1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning = false}}Copy the code
  • Next we need to add the data source to the environment
object DataStreamSourceApp {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    nonParallelSourceFunction(env)
    env.execute("DataStreamSourceApp")
  }

  def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {
    val data = env.addSource(new CustomNonParallelSourceFunction)
    data.print()
  }
}
Copy the code
  • Method 2: Implementing the ParallelSourceFunction Interface
class CustomParallelSourceFunction extends ParallelSourceFunction[Long]
Copy the code
  • This way we can set the parallelism
def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = {
  val data = env.addSource(new CustomParallelSourceFunction).setParallelism(2)
  data.print()
}
Copy the code
  • The third way: extending the RichParallelSourceFunction for parallel sources
class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long]
Copy the code

2. Transformation

You can convert one or more DataStream to a new DataStream

The Map and the Filter

  • We generate the data directly using the custom data source above
def filterFunction(env: StreamExecutionEnvironment): Unit = {
  val data = env.addSource(new CustomNonParallelSourceFunction)

  data.map(x =>{
    println("received: " + x)
    x
  }).filter(_%2 == 0).print().setParallelism(1)
}
Copy the code

Union

A Union can merge multiple streams into a single stream for unified processing of the merged streams.

def unionFunction(env: StreamExecutionEnvironment): Unit = {
  val data1 = env.addSource(new CustomNonParallelSourceFunction)
  val data2 = env.addSource(new CustomNonParallelSourceFunction)
  data1.union(data2).print().setParallelism(1)
}
Copy the code

The Split and Select

Split Splits a stream into multiple streams.

Select Selects one or more streams from the split stream.

def splitSelectFunction(env: StreamExecutionEnvironment): Unit = {
  val data = env.addSource(new CustomNonParallelSourceFunction)

  val splits = data.split(new OutputSelector[Long] {
    override def select(value: Long): lang.Iterable[String] = {
      val list = new util.ArrayList[String]()
      if (value % 2 == 0) {
        list.add("even")}else {
        list.add("odd")
      }
      list
    }
  })

  splits.select("even").print().setParallelism(1)
}
Copy the code

3. Sink

Custom Sink

  • Requirement: The socket sends the data, converts the String to an object, and saves the Java object to the MySQL database
  • Database table building
Create table student(id int(11) NOT NULL AUTO_INCREMENT, name vARCHar (25), age int(10), primary key(ID)); create table student(id int(11) NOT NULL AUTO_INCREMENT, name varchar(25), age int(10), primary key(ID));Copy the code
  • Inherit RichSinkFunction, T is the type of object you want to write to
  • Rewrite open/close: lifecycle methods
  • Rewrite invoke: Execute once per record
public class SinkToMySQL extends RichSinkFunction<Student> {

    Connection connection;
    PreparedStatement preparedStatement;

    private Connection getConnection() {
        Connection conn = null;
        try {
            String url = "jdbc:mysql://localhost:3306/test";
            conn = DriverManager.getConnection(url, "root"."00000000");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return conn;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        connection = getConnection();
        String sql = "insert into Student(id, name, age) values (? ,? ,?) "; preparedStatement = connection.prepareStatement(sql); } public void invoke(Student value, Student value) Throws Exception {preparedStatement.setint (1, value.getid ()); preparedStatement.setint (1, value.getid ()); preparedStatement.setString(2, value.getName()); preparedStatement.setInt(3, value.getAge()); preparedStatement.executeUpdate(); } @Override public void close() throws Exception {if(connection ! = null) { try { connection.close(); } catch(Exception e) { e.printStackTrace(); } connection = null; }}}Copy the code
  • Develop test methods
public class JavaCustomSinkToMySQL {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);

        SingleOutputStreamOperator<Student> studentStream = source.map(new MapFunction<String, Student>() {
            @Override
            public Student map(String value) throws Exception {
                System.out.println(value);
                String[] splits = value.split(",");
                Student stu = new Student();
                stu.setId(Integer.parseInt(splits[0]));
                stu.setName(splits[1]);
                stu.setAge(Integer.parseInt(splits[2]));
                returnstu; }}); studentStream.addSink(new SinkToMySQL()); env.execute("JavaCustomSinkToMySQL"); }}Copy the code

6. Flink Table API & SQL programming

You can’t expect business users to be able to program, so a good framework will provide a Table API that people can use to get their work done in SQL.

1. What is Flink relational API

  • DataSet & DataStream API
    • Be familiar with two sets of apis: DataSet/DataStream
      • MapReduce => Hive SQL
      • Spark => Spark SQL
      • Flink => SQL
    • Flink supports batch/stream processing, how to achieve API level unification
  • Table & SQL API: Relational API

2. Table API & SQL development overview

  • Apache Flink features two relational APIs – the Table API and SQL – for unified stream and batch processing.
  • The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way.

3. Table API & SQL programming

  • Prepare a data sheet
Thpffcj: data Thpffcj $cat sales. CSV transactionId, customerId, itemId, amountPaid,1,1,100.0,2,2,505.0,3,3,510.0 113 112 111 114,4,4,600.0 115,1,2,500.0Copy the code
public class JavaTableSQLAPI {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

        String filePath = "file:///Users/thpffcj/Public/data/sales.csv"; // DataSet<Sales> CSV = env.readcsvFile (filePath).ignoreFirstline ().pojotype (sales.class,"transactionId"."customerId"."itemId"."amountPaid");

        // DataSet => Table
        Table sales = tableEnv.fromDataSet(csv);
        // Table => table
        tableEnv.registerTable("sales", sales);

        // sql
        Table resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId"); DataSet<Row> result = tableEnv.toDataSet(resultTable, Row.class); result.print(); } public static class Sales { public String transactionId; public String customerId; public String itemId; public Double amountPaid; }}Copy the code

The last

You can follow my wechat public number to learn and progress together.