Please follow my personal blog to learn more

Flink stream processing infrastructure

Similar to Storm and Spark, Flink also has an execution environment under which to write the minimum framework

public class SourceTest {
public static void main(String[] args) throws Exception{
// Create the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Import data
DataStreamSource<String> stream = env.readTextFile("");
// Process the data
stream.print();
/ / executionenv.execute(); }}Copy the code

StreamExecutionEnvironment.getExecutionEnvironment(); Functions can be automatically returned to the local execution environment or the clustered execution environment as appropriate or can be set through functions

Returns the local execution environment, requiring a default degree of parallelism to be specified at the time of invocation

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1)
Copy the code

Return to the cluster execution environment and submit the Jar to the remote server. You need to specify the IP and port number of the JobManager on the call, and specify the Jar package to run in the cluster

StreamExecutionEnvironment env =ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname".6123."YOURPATH//wordcount.jar")
Copy the code

Source

Reading data from a collection (finite stream)

Start by defining the data source sample class

// For data types in data streams, we usually use POJP (Java) and Case Class (Scala) to define a data type class

// For POJO methods
// When we want to group with keyby
// The POJO class definition must meet the following conditions
//1. The field name must be declared public;
//2. Must have a default constructor with no parameters;
//3. All constructors must be declared public
// Define the sample type sensor ID timestamp temperature
 class SensorReading {
    // Id of the sensor
    private String id;
    // Sensor timestamp
    private Long timestamp;
    // The temperature of the sensor
    private Double temperature;


    public SensorReading(String id, Long timestamp, Double temperature) {
        this.id = id;
        this.timestamp = timestamp;
        this.temperature = temperature;
    }

    public String getId(a) {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getTimestamp(a) {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Double getTemperature(a) {
        return temperature;
    }

    public void setTemperature(Double temperature) {
        this.temperature = temperature;
    }

    @Override
    public String toString(a) {
        return "SensorReading{" +
                "id='" + id + '\' ' +
                ", timestamp=" + timestamp +
                ", temperature=" + temperature +
                '} '; }}Copy the code

Write the main logic code

public class SourceTest {
    public static void main(String[] args) throws Exception{
        // Create the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        This is a bounded data stream
        DataStreamSource<SensorReading> stream1 = env.fromCollection(Arrays.asList(
                new SensorReading("sensor_1".1547718199L.35.8),
                new SensorReading("sensor_6".1547718201L.15.4),
                new SensorReading("sensor_7".1547718202L.6.7),
                new SensorReading("sensor_10".1547718205L.38.1),
                new SensorReading("sensor_15".1547716505L.39.1),
                new SensorReading("sensor_18".1547718685L.78.1)));// Print the data
        stream1.print(); // The default parallelism number of CPU cores
        / / execution
        env.execute("source test"); }}Copy the code

Execute look wait a few seconds later

You can see the order is out of order

Read data from a file

So this is pretty simple core code

DataStreamSource stream2 = env.readTextFile("");
Copy the code

Source data from the Kafka message queue (emphasis)

Introduce the Kafka connector dependency

<groupId>org.apache.flink</groupId>
<artifactId>Flink connector - kafka - 0.11 _2. 12</artifactId>
<version>1.10.1</version>
</dependency>
Copy the code

Configure Kafka information

/ / kafka configuration
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers"."192.168.216.111:9092192168 216.112:9092192168 216.113:9092");
        properties.setProperty("group.id"."flink-kafka");
        properties.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset"."latest");
Copy the code

Read the data

// Read data from a Kafka topic
        DataStreamSource<String> stream3 = env.addSource(new FlinkKafkaConsumer011<String>(
                "sensor".new SimpleStringSchema(),
                properties
        ));
stream3.print();
Copy the code

The addSource() function in Flink provides a method to import data sources, which can use flink’s already defined Source(such as kafka above), of course, also can customize the Source

Custom Source

In addition to the above source data sources, we can also customize the source. All you need to do is pass in a SourceFunction. The specific invocation is as follows:

DataStreamSource<String> stream4 = env.addSource(new new CustomSource())
Copy the code

The data sources that can randomly generate sensor data are defined below

public class SourceFromCustom {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new CustomSource());
        inputDataStream.print();
        env.execute();
    }

    // The rich function inherits from SourceFunction
    public static class CustomSource implements SourceFunction<SensorReading> {
        boolean running = true;
        @Override
        public void run(SourceContext<SensorReading> sourceContext) throws Exception {

            Random random = new Random();
            // Define a wireless loop that continuously generates data unless canceled
            while (running) {
                // Every 100 seconds
                for (int i = 0; i < 5; i++) {
                    String id = UUID.randomUUID().toString().substring(0.8);
                    long timestamp = System.currentTimeMillis();  // Get milliseconds
                    double temperature = 60 + random.nextGaussian() * 20;  NextGaussian () : returns the next pseudo-random Gaussian double with a mean of 0.0 and standard deviation of 1.0
                    sourceContext.collect(new SensorReading(id, timestamp, temperature));

                    Thread.sleep(100L);
                }

                Thread.sleep(1000L); }}@Override
        public void cancel(a) {
            running = false; }}}Copy the code

The result is as follows

This article is reproduced on my personal blog. The source of the Flink stream processing API is copyrightable under CC 4.0 by-SA

Welcome to exchange and study

Personal blog

CSDN home page