Learn how to define data sources in the DataStream APIOverview | Apache Flink). The official website gives four categories:

Collection-based

  • FromCollection (Collection) : Creates data streams from Java java.util. Collection. All elements in the collection must belong to the same type.
  • FromCollection (Iterator, Class) : Creates streams of data from iterators. This class specifies the data type of the element returned by the iterator.
  • fromElements(T …) : Creates a data stream from a given sequence of objects. All objects must belong to the same type.
  • FromParallelCollection (SplittableIterator, Class) : Creates streams of data in parallel from iterators. This class specifies the data type of the element returned by the iterator.
  • GenerateSequence (from, to) : Generate a sequence of numbers in a given interval in parallel. Obsolete, now in usefromSequence.
package com.learn.flink.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/** * stream data - data source - collection based * DataStream - source - collection */
public class SourceDemo_Collection {
    public static void main(String[] args) throws Exception {
        / / 0: env
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 1: source
        //fromCollection(Collection)
        DataStream<String> ds1 = env.fromCollection(Arrays.asList("hello world"."hello dog"));
        //fromElements(T ...)
        DataStream<String> ds2 = env.fromElements("hello world"."hello dog");
        //fromSequence(from, to)
        DataStream<Long> ds3 = env.fromSequence(1.100);
        
        // 2. transformation ...

        // 3: sink
        ds1.print();
        ds2.print();
        ds3.print();

        // 4: executeenv.execute(); }}Copy the code

Based on the file

  • ReadTextFile (PATH) : Reads text files line by line, that is, files that comply with the TextInputFormat specification, and returns them as strings.
  • ReadFile (fileInputFormat, path) : reads (once) a file according to the specified fileInputFormat.
  • ReadFile (fileInputFormat, Path, watchType, interval, pathFilter, typeInfo) : These are the first two internally called methods. It reads the files in the path according to the given fileInputFormat. According to provide watchType, this source may periodically monitor each interval (ms) of new data path (FileProcessingMode. PROCESS_CONTINUOUSLY), Or processing of data in the current path and exit (FileProcessingMode. PROCESS_ONCE). With pathFilter, the user can further exclude the files being processed.

Behind the scenes, Flink splits the file reading process into two subtasks, directory monitoring and data reading. Each of these subtasks is implemented by a separate entity. Monitoring is performed by a single non-parallel (degree of parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to the parallelism of the job. The role of a single monitoring task is to scan the list (periodically or only once, depending on the watchType), find the documents to be received, divide them into splits, and distribute those splits to the other splits. The reader is the person who will read the actual data. Each shard can only be read by one reader, and a reader can read multiple shards one by one.

package com.learn.flink.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/** * Stream data - Data source - Local or HDFS file/folder/compressed file * datastream-source-file */
public class SourceDemo_File {
    public static void main(String[] args) throws Exception {
        / / 0: env
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 1: source
        DataStream<String> ds1 = env.readTextFile("data/words1");
        DataStream<String> ds2 = env.readTextFile("data/dir");
        DataStream<String> ds3 = env.readTextFile("data/words1.rar");

        // 2. transformation

        // 3: sink
        ds1.print();
        ds2.print();
        ds3.print();
// ds4.print();

        // 4: executeenv.execute(); }}Copy the code

Based on the socket

  • SocketTextStream: Reads data from the socket. Elements can be separated by delimiters.
package com.learn.flink.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/** * stream data - data source - socket * datastream-source - socket */
public class SourceDemo_Socket {

    public static void main(String[] args) throws Exception {
        / / 0: env
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 1: source
        DataStream<String> ds = env.socketTextStream("node01".999);

        // 2. transformation
        / / cutting
        DataStream<String> words = ds.flatMap((String value, Collector<String> out) -> {
            Arrays.stream(value.split("")).forEach(out::collect);
        }).returns(Types.STRING);
        // Each word counts 1
        DataStream<Tuple2<String, Integer>> wordAndOne = words.map(value -> Tuple2.of(value, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));
        / / group
        final KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(value -> value.f0);
        / / the aggregation
        final SingleOutputStreamOperator<Tuple2<String, Integer>> sum = grouped.sum(1);
        // 3: sink
        sum.print();
        // 4: executeenv.execute(); }}Copy the code

Custom data source

  • AddSource: Appends a new source function. For example, to read data from Apache Kafka, you can use addSource(new FlinkKafkaConsumer<>(…)) ). See Connectors for more details.

We can also customize data sources when working with other data sources. Flink provides a data source interface, which we can implement for a long time to achieve custom data sources, classified as follows:

  • SourceFunction: non-parallel data source, parallelism can only be 1
  • RichSourceFunction: multi-function parallel data source, parallelism can only be 1
  • ParallelSourceFunction: parallel data sources, parallelism >= 1
  • RichParallelSourceFunction: multi-function parallel data source, parallelism > = 1, the follow-up study of kafka use data source is the interface.

From the above classification, we are generally use RichParallelSourceFunction, this includes the above three kinds of functions.

package com.learn.flink.source;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Random;
import java.util.UUID;


/** * stream data - data source - socket * datastream-source - socket */
public class SourceDemo_Custom {

    public static void main(String[] args) throws Exception {
        / / 0: env
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 1: source
        final DataStreamSource<Order> orderDS = env.addSource(new MyOrderSource()).setParallelism(1);
        // 2. transformation

        // 3: sink
        orderDS.print();
        // 4: execute
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String id;
        private Integer userId;
        private Integer money;
        private Long createTime;
    }

    / * * * a custom data source inheritance RichParallelSourceFunction * multifunctional parallel data sources (parallelism can be > = 1) * /
    public static class MyOrderSource extends RichParallelSourceFunction<Order> {

        private boolean run = true;
        @Override
        public void run(SourceContext<Order> sourceContext) throws Exception {
            final Random random = new Random();
            while (run) {
                final String oid = UUID.randomUUID().toString();
                final int uid = random.nextInt(3);
                final int money = random.nextInt(101);
                final long createTime = System.currentTimeMillis();

                sourceContext.collect(new Order(oid, uid, money, createTime));

                // Execute once every 1 second
                Thread.sleep(1000); }}@Override
        public void cancel(a) {
            // When cancel is executed, no more data is generated
            run = false; }}}Copy the code

Mysql > select * from mysql;

package com.learn.flink.source;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


/** * Stream data - data source - socket * DataStream - source - Custom data source */
public class SourceDemo_Custom_Mysql {

    public static void main(String[] args) throws Exception {
        / / 0: env
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 1: source
        final DataStreamSource<Student> orderDS = env.addSource(new MySqlSource()).setParallelism(1);
        // 2. transformation

        // 3: sink
        orderDS.print();
        // 4: execute
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Student {
        private String id;
        private String name;
        private Integer age;
    }

    / * * * a custom data source inheritance RichParallelSourceFunction * multifunctional parallel data sources (parallelism can be > = 1) * /
    public static class MySqlSource extends RichParallelSourceFunction<Student> {
        private boolean run = true;
        private Connection conn = null;
        private PreparedStatement ps = null;
        private ResultSet rs = null;
        /** * open is executed only once, suitable for opening resources */
        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection("jdbc:mysql://node01:3306/bigdata"."root"."root");
            String sql = "select id, name, age from t_student";
            ps = conn.prepareStatement(sql);
        }

        @Override
        public void run(SourceContext<Student> sourceContext) throws Exception {
            while (run) {
                ResultSet rs = ps.executeQuery();
                while (rs.next()) {
                    final String id = rs.getString("id");
                    final String name = rs.getString("name");
                    final int age = rs.getInt("age");
                    sourceContext.collect(new Student(id, name, age));
                }
                Thread.sleep(5000); }}@Override
        public void cancel(a) {
            run = false;
        }

        /** * Suitable for closing resources *@throws Exception
         */
        @Override
        public void close(a) throws Exception {
            if(conn ! =null) conn.close();
            if(ps ! =null) ps.close();
            if(rs ! =null) rs.close(); }}}Copy the code

The poM file used for the above content is:


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.learn</groupId>
    <artifactId>flink-demo1</artifactId>
    <version>1.0 the SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <java.version>11</java.version>
        <flink.version>1.13.2</flink.version>
        <mysql.version>8.0.18</mysql.version>
        <lombok.version>1.18.20</lombok.version>
        <slf4j.version>1.7.25</slf4j.version>
    </properties>

    <! -- Specify warehouse address -->
    <repositories>

        <repository>
            <id>aliyun</id>
            <name>maven-aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>maven-public</id>
            <name>maven-public</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </pluginRepository>
    </pluginRepositories>

    <! -- Configure dependencies -->
    <dependencies>
        <! -- flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - clients_2. 11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - streaming - java_2. 11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <! -- mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        <! -- slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-release-plugin</artifactId>
                <version>2.5.3</version>
            </plugin>
            <plugin>
                <artifactId>maven-source-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>jar-no-fork</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
Copy the code