The introduction

Hello everyone, I am ChinaManor, which literally translates to Chinese code farmer. I hope I can become a pathfinder on the road of national rejuvenation, a ploughman in the field of big data, an ordinary person who is unwilling to be mediocre.

Below to bring you Alibaba extremely hot push of Flink, real-time number warehouse is the direction of the future, learn Flink, monthly salary is not a dream!!

Related tutorials:

Flink 2021 Flink Quick Start (Overview)

Flink and Batch Streaming API(part 2)

Flink advanced API(3)

Day02-03_ Stream batch integration API

Today’s goal

  • A preliminary study on the principle of flow processing

  • Flow processing concepts (Understanding)

  • Data Source of program structure

  • Data Transformation of program structure (Master)

  • Data landing Sink of program structure

  • Flink Connector Connectors

A preliminary study on the principle of flow processing

  • Flink’s role allocation

    1. JobMaster is responsible for cluster management, fault recovery, and checkpoint setting
    2. Taskmanager Worker’s younger brother, which is responsible for executing a task
    3. Client Displays the page for submitting tasks
  • Taskmanager execution capability

    1. Taskslot static concept
    2. Parallelism Dynamic parallelism concept

  • Each node is a task

    Each task is divided into multiple tasks for parallel processing. Multiple threads have multiple subtasks, which are called subtasks

  • The StreamGraph logic executes the flow graph DataFlow

    Operator chain Operation chain

  • JobGraph

    ExecuteGraph Physical execution plan

  • Event The Event is time-stamped

  • Operator transmission mode: One to one, redistributing mode

  • Execution diagram of Flink

Flow processing concept

Timeliness of data

  • Emphasis is placed on the timeliness of data processing

    Processing time window, by month, by day, by hour or by second

Stream processing and batch processing

  • Batch processing is bounded data

    • Process complete data sets, such as sorting data, calculating global state, and generating final input summaries.
    • Batch calculation: Collect data in a unified manner, store data in DB, and process data in batches
  • Streams are unbounded data

    • Window operation to divide the boundary of data for calculation
    • Streaming computing, as the name suggests, is the processing of a stream of data
  • Support for streaming and batching in Flink1.12 supports both streaming and batching.

  • Stream batch in one Flink1.12.x batch and stream processing

    • Reusability: Jobs can be switched between stream mode and batch mode without rewriting any code.
    • Simple maintenance: A unified API means streams and batches can share the same set of connectors and maintain the same set of code.

Programming model

  • Source – Reads the data source
  • Transformation – Data transformation map flatMap groupBy keyBy sum
  • Sink – Landing data addSink print

Source

Source based on collections

  • Development and test use

  • classification

    1.Env.fromelements (variable parameters);2.Env.fromcolletion (various collections); # overdue3.Env.generatesequence (start, end);4.Env.fromsequence (start, end);Copy the code
  • Using the collection Source

    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.ArrayList;
    
    /** * Author itcast * Date 2021/6/16 9:29 * Create a stream environment * 2. Read data from the collection * 3. Print out * 4. Run to execute */
    public class SourceDemo01 {
        public static void main(String[] args) throws Exception {
            //1. Create a flow environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2. Read data from the collection
            //2.1 fromElement fromElement set
            DataStreamSource<String> source1 = env.fromElements("hello world"."hello spark"."hello flink");
            //2.2 fromCollection fromCollection list
            ArrayList<String> strings = new ArrayList<>();
            strings.add("hello world");
            strings.add("hello flink");
            DataStreamSource<String> source2 = env.fromCollection(strings);
            //2.3 fromSequence
            DataStreamSource<Long> source3 = env.fromSequence(1.10);
            //3
            source1.print();
            //4. Run the commandenv.execute(); }}Copy the code
  • Socket Data source wordcount Statistics

    /** * Author itcast * Desc * SocketSource */
    public class SourceDemo03 {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            //2.source
            DataStream<String> linesDS = env.socketTextStream("node1".9999);
    
            //3. Transformation
            //3.1 Each line of data is divided into words according to the space to form a set
            DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) throws Exception {
                    //value is a line of data
                    String[] words = value.split("");
                    for (String word : words) {
                        out.collect(word);// Collect the cut words and return them}}});//3.2 Count each word in the set as 1
            DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    //value is one word after another
                    return Tuple2.of(value, 1); }});// group data by key
            //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
            KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
            //3.4 To aggregate the data in each group by value is to calculate sum
            DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
    
            //4. Output result -sink
            result.print();
    
            //5. -execute is triggeredenv.execute(); }}Copy the code
  • Custom data source – Random data

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    
    import java.util.Random;
    import java.util.UUID;
    
    /** * Author itcast * Date 2021/6/16 10:18 * Generate order information every 1 second * request: * - Randomly generated order ID(UUID) * - randomly generated user ID(0-2) * - Randomly generated order amount (0-100) * - Timestamp is current system time * * SourceFunction: non-parallel data source (parallelism only =1) * ParallelSourceFunction: RichSourceFunction: RichSourceFunction: RichSourceFunction: RichSourceFunction: RichSourceFunction: RichSourceFunction: RichSourceFunction: RichSourceFunction: RichSourceFunction: RichSourceFunction: RichSourceFunction RichParallelSourceFunction: multi-function parallel data sources (parallelism can be > = 1), the follow-up study of Kafka use data source is the interface * /
    public class CustomSource01 {
        public static void main(String[] args) throws Exception {
            / / 1. Create StreamExectution env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //2. Source, create automatic generation Order data source
            DataStreamSource<Order> source = env.addSource(new MyOrderSource());
            //3. Print the data source
            source.print();
            4 / / execution
            env.execute();
            // Define entity class Order with four fields OID UID Money currentTime
            / / define a static inner class MyOrderSource RichParallelSourceFunction inheritance
            // Generate order information randomly every 1 second (order ID, user ID, order amount, timestamp)
            / / requirements:
            //- Generate order ID(UUID) randomly
            //- Generate user ID randomly (0-2)
            //- Randomly generated order amount (0-100)
            //- The timestamp is the current system time
        }
    
        public static class MyOrderSource extends RichParallelSourceFunction<Order> {
            boolean flag = true;
            Random rn = new Random();
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                // Generate order information randomly every 1 second (order ID, user ID, order amount, timestamp)
                / / requirements:
                while(flag) {
                    //- Generate order ID(UUID) randomly
                    String oid = UUID.randomUUID().toString();
                    //- Generate user ID randomly (0-2)
                    int uid = rn.nextInt(3);
                    //- Randomly generated order amount (0-100)
                    int money = rn.nextInt(101);
                    //- The timestamp is the current system time
                    long currentTime = System.currentTimeMillis();
                    ctx.collect(new Order(oid,uid,money,currentTime));
                    // Take a break for a second
                    Thread.sleep(1000); }}@Override
            public void cancel(a) {
                flag = false; }}// Create the Order object
        @AllArgsConstructor
        @NoArgsConstructor
        @Data
        public static class Order{
            private String oid;
            private int uid;
            private int money;
            private longcurrentTime; }}Copy the code
  • Custom data source – Read t_student table data from MySQL database (this scenario is very rarely used – custom data source)

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    
    /** * Author itcast * Date 2021/6/16 10:37 * * //1. Env set parallelism to 1 * //2.source, create connection to MySQL data source, generate data every 2 seconds * //3. * // Create static inner class Student Fields for the id: int name: String age: int * / / create a static inner class MySQLSource inheritance RichParallelSourceFunction < Student > * / / implement the open method, create Connection and prepareStatement * / / access database connection mysql5.7 version * / / / / JDBC: mysql: / / 192.168.88.163:3306 / bigdata? UseSSL =false */ / implement the run method, create a data every 5 seconds */ / implement the close method */
    public class CustomSourceMySQL {
        public static void main(String[] args) throws Exception {
            Env sets parallelism to 1
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //2. Source, create connection to MySQL data source data source, generate data every 2 seconds
            DataStreamSource<Student> source = env.addSource(new MySQLSource());
            //3. Print the data source
            source.print();
            4 / / execution
            env.execute();
            // create static inner class Student with id:int name:String age:int
            / / create a static inner class MySQLSource inheritance RichParallelSourceFunction < Student >
            // Implement the open method to create a Connection and prepareStatement
            // Get the database connection to mysql5.7
            / / / / JDBC: mysql: / / 192.168.88.163:3306 / bigdata? useSSL=false
            // Implement the run method to create data every 5 seconds
            // Implement the close method
        }
        public static class MySQLSource extends RichSourceFunction<Student> {
            boolean flag = true;
            Connection conn = null;
            PreparedStatement ps = null;
            // Open the start of the lifecycle, only once
            @Override
            public void open(Configuration parameters) throws Exception {
                conn = DriverManager.getConnection("JDBC: mysql: / / 192.168.88.163:3306 / bigdata? useSSL=false"."root"."123456");
                String sql = "select id,name,age from t_student";
                / / preparement execution
                ps = conn.prepareStatement(sql);
            }
    
            @Override
            public void run(SourceContext<Student> ctx) throws Exception {
                while(flag){
                    // Query the result set
                    ResultSet rs = ps.executeQuery();
                    while(rs.next()){
                        int id = rs.getInt("id");
                        String name = rs.getString("name");
                        int age = rs.getInt("age");
                        ctx.collect(new Student(id,name,age));
                        Thread.sleep(5000); }}}@Override
            public void cancel(a) {
                flag = false;
            }
    
            // Close the database only once in the entire lifecycle
            @Override
            public void close(a) throws Exception {
                if(! ps.isClosed()) ps.close();if(!conn.isClosed()) conn.close();
            }
        }
        / / define student
        @AllArgsConstructor
        @NoArgsConstructor
        @Data
        public static class Student{
            private int id;
            private String name ;
            private intage; }}Copy the code

Merger-split

  • Merge Data Streams To merge two data streams into a single data stream

  • Application scenarios

    ① Information, statistical analysis and mining of all orders from different data sources of computers, APPS, ipads and wechat mini programs

    (2) Collect and statistically analyze the behavior trajectory of different handheld devices and computer users

  • A union is different from a connect

    Union and connect merge

    The union operator requires that the type of data flow be consistent.

    The CONNECT operator requires that the data flow type be inconsistent

  • Requirement: Merge two data streams together

    /** * Author itcast * Desc */
    public class TransformationDemo02 {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            env.setParallelism(1);
    
            //2.Source
            DataStream<String> ds1 = env.fromElements("hadoop"."spark"."flink");
            DataStream<String> ds2 = env.fromElements("oozie"."flume"."flink");
            DataStream<Long> ds3 = env.fromElements(1L.2L.3L);
    
            //3.Transformation
            // The union operator guarantees the consistency of the two data stream types
            DataStream<String> result1 = ds1.union(ds2);/ / merge but not go to https://blog.csdn.net/valada/article/details/104367378
            // Connect the two data stream types can be different
            ConnectedStreams<String, Long> tempResult = ds1.connect(ds3);
            //interface CoMapFunction<IN1, IN2, OUT>
            DataStream<String> result2 = tempResult.map(new CoMapFunction<String, Long, String>() {
                @Override
                public String map1(String value) throws Exception {
                    return "String->String:" + value;
                }
    
                @Override
                public String map2(Long value) throws Exception {
                    return "Long->String:"+ value.toString(); }});//4.Sink
            //result1.print();
            result2.print();
    
            //5.executeenv.execute(); }}Copy the code

Split select and Outputside

  • Divide a data stream into multiple data streams

  • Application scenarios

    (1) The server sends normal logs, alarm logs, and error logs

  • Requirements – Split the data stream into even and odd numbers

  • Development steps

    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    /** * Author itcast * Date 2021/6/16 11:29 * * //1. Env * /2.Source for example, the number between 1 and 20 * // define two output tags, one odd and one even. Get two side output streams */ /4.sink prints output */ /5.execute */
    public class SplitDataStream {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //2.Source for example, a number between 1 and 20
            DataStreamSource<Long> source = env.fromSequence(1.21);
            // Define two side output streams, one odd and one even, with type Long
            // The corresponding data type needs to be specified. By default, the OutputTag uses a generic type, which needs to be manually specified
            OutputTag<Long> odd = new OutputTag<Long>("odd", TypeInformation.of(Long.class));
            OutputTag<Long> even = new OutputTag<Long>("even", TypeInformation.of(Long.class));
            // Process the source data to distinguish between odd and even numbers
            SingleOutputStreamOperator<Long> result = source.process(new ProcessFunction<Long, Long>() {
                @Override
                public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {
                    if (value % 2= =0) {
                        ctx.output(even, value);
                    } else{ ctx.output(odd, value); }}});//3. Get two side output streams
            //result.print();
            result.getSideOutput(even).print("Even");
            result.getSideOutput(odd).print("Odd");
            //4. Sink prints output
            //5.executeenv.execute(); }}Copy the code

Rebalance data

  • The data is evenly distributed to each node, and the calculation is more uniform.

  • Requirement: Use 3 threads to calculate 100 90 numbers greater than 10 evenly

  • code

    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /** * Author itcast * Date 2021/6/17 15:00 *
    public class RebalanceDemo {
        public static void main(String[] args) throws Exception {
            Env sets parallelism to 3
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(3);
            //2.source fromSequence 1-100
            DataStreamSource<Long> source = env.fromSequence(1.100);
            //3.Transformation
            // The following operation is equivalent to random distribution of data, data skew may occur, filter out more than 10
            DataStream<Long> filterDS = source.filter(s -> s > 10);
            // change Long data to tuple2(partition number/subtask number, 1) using map
            /*SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapDS = filterDS.map(new RichMapFunction<Long, Tuple2<Integer*//**CPU core number *//*, Integer>>() { @Override public Tuple2
            
              map(Long value) throws Exception {// Obtain the task Index through getRuntimeContext int IDX = getRuntimeContext().getIndexOfThisSubtask(); // Return tuple2. of(idx, 1); }}); // Group by subtask ID/partition id, Each subtask/partition statistics, there are several elements SingleOutputStreamOperator < Tuple2 < Integer, Integer>> result1 = mapds. keyBy(I -> i.0) // Group and aggregate the current data streams by key. Sum (1); * /
            ,>
            Perform the above operations again after filter rebalance and then map
            SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapDS = filterDS
                    .rebalance()
                    .map(new RichMapFunction<Long, Tuple2<Integer/**CPU core number */, Integer>>() {
                @Override
                public Tuple2<Integer, Integer> map(Long value) throws Exception {
                    // Get the task Index through getRuntimeContext
                    int idx = getRuntimeContext().getIndexOfThisSubtask();
                    // return Tuple2(task Index,1)
                    return Tuple2.of(idx, 1); }});// Count the number of elements in each subtask/partition by subtask ID/partition id
            SingleOutputStreamOperator<Tuple2<Integer, Integer>> result2 = mapDS.keyBy(i -> i.f0)
                    // Group and aggregate the current data flow by key
                    .sum(1);
            //4.sink
            //result1.print(" No repartition ");
            result2.print("Repartition");
            //5.executeenv.execute(); }}Copy the code

Sink

Predefined Sink


/** * Author itcast * Desc * 1.ds.print directly to the console * 2.ds.printtoerr () directly to the console, in red * 3.ds.collect distributed data as a local collection * Ds.setparallelism (1). WriteAsText (" Local /HDFS path", writemode.overwrite) */
public class SinkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.source
        //DataStream<String> ds = env.fromElements("hadoop", "flink");
        DataStream<String> ds = env.readTextFile("data/input/words.txt");

        //3.transformation
        //4.sink
        ds.print();
        ds.printToErr();
        ds.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2);
        / / note:
        / / Parallelism = 1 as a file
        //Parallelism>1 is a folder

        //5.executeenv.execute(); }}Copy the code

Custom Sink

  • demand

    Writes data from the collection to MySQL

  • Development steps

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    
    /** * Author itcast * Date 2021/6/17 15:43 * Desc TODO */
    public class CustomSinkMySQL {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.Source
            DataStream<Student> studentDS = env.fromElements(new Student(null."tonyma".18));
            //3.Transformation
            //4.Sink
            studentDS.addSink(new MySQLSink());
    
            //5.execute
            env.execute();
        }
    
        // Implement RichSinkFunction to insert data into t_student table in MySQL
        public static class MySQLSink extends RichSinkFunction<Student>{
            Connection conn ;
            PreparedStatement ps;
            // Connect to the database
            @Override
            public void open(Configuration parameters) throws Exception {
                conn = DriverManager.getConnection("JDBC: mysql: / / 192.168.88.163:3306 / bigdata? useSSL=false"."root"."123456");
                String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ? ,?) ";
                ps = conn.prepareStatement(sql);
            }
    
            // Insert the data into the database
            @Override
            public void invoke(Student value, Context context) throws Exception {
                ps.setString(1,value.name);
                ps.setInt(2,value.age);
                ps.executeUpdate();
            }
    
            // Close the database
            @Override
            public void close(a) throws Exception {
                if(! ps.isClosed()) ps.close();if(!conn.isClosed()) conn.close();
            }
        }
    
        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class Student {
            private Integer id;
            private String name;
            privateInteger age; }}Copy the code

Connector

  • Flink official connector, used to connect to JDBC or Kafka, MQ, etc

JDBC Connection mode

  • Requirement: Store data elements to MySQL database via JDBC

    /** * Author itcast * Date 2021/6/17 15:59 * Desc TODO */
    public class JDBCSinkMySQL {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.Source
            DataStreamSource<Student> source = env.fromElements(new Student(null."JackMa".42));
            //3. Insert data into mysql database through JDBC
            source.addSink(JdbcSink.sink(
                    // Enter SQL to execute the insert SQL statement
                    "INSERT INTO t_student(id,name,age) values (null,? ,?) ".// Perform the assignment of the insert
                        (ps, student) -> {
                            ps.setString(1,student.name);
                            ps.setInt(2,student.age);
                        },
                    / / the constructor
                    // The options executed set parameters such as the batch size
                    JdbcExecutionOptions.builder()
                            .withBatchSize(1000)
                            .withBatchIntervalMs(200)
                            .withMaxRetries(5)
                            .build(),
                    //4. Parameter Set connection parameters
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl("JDBC: mysql: / / 192.168.88.163:3306 / bigdata? useSSL=false")
                            .withUsername("root")
                            .withPassword("123456")
                            .withDriverName("com.mysql.jdbc.Driver")
                            .build()));
            //5. Execution environment
            env.execute();
        }
    
        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class Student {
            private Integer id;
            private String name;
            privateInteger age; }}Copy the code

Kafka connection mode

  • Kafka is a message queue

  • Requirements:

    Kafka uses Flink to write data elements into Kafka

    package cn.itcast.flink.sink;
    
    import com.alibaba.fastjson.JSON;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
    import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
    import org.apache.kafka.clients.producer.ProducerConfig;
    
    import java.util.Properties;
    
    /** * Author itcast * Date 2021/6/17 16:46 **
    public class KafkaProducerDemo {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.Source generates the element Student
            DataStreamSource<Student> studentDS = env.fromElements(new Student(102."Oking".25));
            //3.Transformation
            // Note: for now, the serialization and deserialization we use with Kafka are directly using the simplest string, so convert Student to a string first
            // The 3.1 map method converts Student to a string
            SingleOutputStreamOperator<String> mapDS = studentDS.map(new MapFunction<Student, String>() {
                @Override
                public String map(Student value) throws Exception {
                    // toJsonString can be called directly or converted to JSON
                    String json = JSON.toJSONString(value);
                    returnjson; }});//4.Sink
            Properties props = new Properties();
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.88.161:9092");
            // Instantiate FlinkKafkaProducer based on arguments
            If you don't need complex parameter Settings, just store the data in the Kafka message queue and use the first overloaded method
            // If you need to set up complex kafka configurations, use overloaded methods other than the first one
            // If you need to set Semantic only once, use the last two
            / * FlinkKafkaProducer producer = new FlinkKafkaProducer (" 192.168.88.161:9092192168 88.162:9092192168 88.163:9092 ", "flink_kafka", new SimpleStringSchema() ); * /
            FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
                    "flink_kafka".new KafkaSerializationSchemaWrapper(
                            "flink_kafka".new FlinkFixedPartitioner(),
                            false.new SimpleStringSchema()
                    ),
                    props,
                    // Support only one semantic way to commit data
                    FlinkKafkaProducer.Semantic.EXACTLY_ONCE
            );
    
            mapDS.addSink(producer);
            // ds.addSink drops to the kafka cluster
            //5.execute
            env.execute();
            / / test/export/server/kafka/bin/kafka - the console - consumer. Sh - the bootstrap - server node1:9092 - topic flink_kafka
        }
    
        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class Student {
            private Integer id;
            private String name;
            privateInteger age; }}Copy the code

Consume data from a Kafka cluster

  • demand

    Read data from Kafka to the console

  • Development steps

    /** * Author itcast * Date 2021/6/17 16:46 **
    public class KafkaProducerDemo {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.Source generates the element Student
            DataStreamSource<Student> studentDS = env.fromElements(new Student(104."chaoxian".25));
            //3.Transformation
            // Note: for now, the serialization and deserialization we use with Kafka are directly using the simplest string, so convert Student to a string first
            // The 3.1 map method converts Student to a string
            SingleOutputStreamOperator<String> mapDS = studentDS.map(new MapFunction<Student, String>() {
                @Override
                public String map(Student value) throws Exception {
                    // toJsonString can be called directly or converted to JSON
                    String json = JSON.toJSONString(value);
                    returnjson; }});//4.Sink
            Properties props = new Properties();
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.88.161:9092");
            // Instantiate FlinkKafkaProducer based on arguments
            If you don't need complex parameter Settings, just store the data in the Kafka message queue and use the first overloaded method
            // If you need to set up complex kafka configurations, use overloaded methods other than the first one
            // If you need to set Semantic only once, use the last two
            / * FlinkKafkaProducer producer = new FlinkKafkaProducer (" 192.168.88.161:9092192168 88.162:9092192168 88.163:9092 ", "flink_kafka", new SimpleStringSchema() ); * /
            FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
                    "flink_kafka".new KafkaSerializationSchemaWrapper(
                            "flink_kafka".new FlinkFixedPartitioner(),
                            false.new SimpleStringSchema()
                    ),
                    props,
                    // Support only one semantic way to commit data
                    FlinkKafkaProducer.Semantic.EXACTLY_ONCE
            );
    
            mapDS.addSink(producer);
            // ds.addSink drops to the kafka cluster
            //5.execute
            env.execute();
            / / test/export/server/kafka/bin/kafka - the console - consumer. Sh - the bootstrap - server node1:9092 - topic flink_kafka
        }
    
        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class Student {
            private Integer id;
            private String name;
            privateInteger age; }}Copy the code

Flink writes to the Redis database

  • Redis is an in-memory database that supports caching and persistence

  • Usage scenarios

    1. Hot data processing, caching mechanism
    2. duplicate removal
    3. Five data types String Hash set Zset List
  • Requirements:

    Data is written to Redis via Flink

    package cn.itcast.flink.sink;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    import org.apache.flink.util.Collector;
    
    /** * * Receive the message and do WordCount, * Finally save the result to Redis * Note: Store the data structure to Redis: use hash i.e. map * key value * WordCount (word, number) */
    public class ConnectorsDemo_Redis {
        public static void main(String[] args) throws Exception {
            Env Execution environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.Source Reads data from the socket
            DataStream<String> linesDS = env.socketTextStream("192.168.88.163".9999);
    
            //3.Transformation
            //3.1 cut and mark as 1
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS
                    .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] words = value.split("");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1)); }}});/ / 3.2 group
            KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
            / / 3.3 aggregation
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
    
            //4.Sink
            result.print();
            // * Save the result to Redis
            // * Note: data structures stored in Redis: use hash (map)
            // * key value
            // * WordCount (word, number)
    
            //-1. Before creating RedisSink, create RedisConfig
            // Connect to the standalone Redis
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                    .setHost("192.168.88.163")
                    .setDatabase(2)
                    .build();
            //-3. Create and use RedisSink
            result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
    
            //5.execute
            env.execute();
        }
    
        /** * -2. Define a Mapper to specify the data structure stored in Redis */
        public static class RedisWordCountMapper implements RedisMapper<Tuple2<String.Integer>> {
            @Override
            public RedisCommandDescription getCommandDescription(a) {
                // Which data type to use, key:WordCount
                return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
            }
            @Override
            public String getKeyFromData(Tuple2<String, Integer> data) {
                // The key to store the data
                return data.f0;
            }
            @Override
            public String getValueFromData(Tuple2<String, Integer> data) {
                // Store the value of the data
                returndata.f1.toString(); }}}Copy the code

The problem

  • Vmware opened the image file 15.5.x and upgraded it to 16.1.0

  • FromSequece (1,10), CPU 12 threads, from <= to

    The parallelism set is greater than the generated data, the parallelism is 12, only 10 generated data, report this.

  • Flink Standalone HA highly available

    jobmanager -> log

conclusion

This is the Flink tutorial for 2021. 5)

May you have your own harvest after reading, if there is a harvest might as wellThree even a keyThe ~