Please follow my personal blog to learn more
Flink stream processing infrastructure
Similar to Storm and Spark, Flink also has an execution environment under which to write the minimum framework
public class SourceTest {
public static void main(String[] args) throws Exception{
// Create the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Import data
DataStreamSource<String> stream = env.readTextFile("");
// Process the data
stream.print();
/ / executionenv.execute(); }}Copy the code
StreamExecutionEnvironment.getExecutionEnvironment(); Functions can be automatically returned to the local execution environment or the clustered execution environment as appropriate or can be set through functions
Returns the local execution environment, requiring a default degree of parallelism to be specified at the time of invocation
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1)
Copy the code
Return to the cluster execution environment and submit the Jar to the remote server. You need to specify the IP and port number of the JobManager on the call, and specify the Jar package to run in the cluster
StreamExecutionEnvironment env =ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname".6123."YOURPATH//wordcount.jar")
Copy the code
Source
Reading data from a collection (finite stream)
Start by defining the data source sample class
// For data types in data streams, we usually use POJP (Java) and Case Class (Scala) to define a data type class
// For POJO methods
// When we want to group with keyby
// The POJO class definition must meet the following conditions
//1. The field name must be declared public;
//2. Must have a default constructor with no parameters;
//3. All constructors must be declared public
// Define the sample type sensor ID timestamp temperature
class SensorReading {
// Id of the sensor
private String id;
// Sensor timestamp
private Long timestamp;
// The temperature of the sensor
private Double temperature;
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId(a) {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp(a) {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature(a) {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString(a) {
return "SensorReading{" +
"id='" + id + '\' ' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'} '; }}Copy the code
Write the main logic code
public class SourceTest {
public static void main(String[] args) throws Exception{
// Create the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
This is a bounded data stream
DataStreamSource<SensorReading> stream1 = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1".1547718199L.35.8),
new SensorReading("sensor_6".1547718201L.15.4),
new SensorReading("sensor_7".1547718202L.6.7),
new SensorReading("sensor_10".1547718205L.38.1),
new SensorReading("sensor_15".1547716505L.39.1),
new SensorReading("sensor_18".1547718685L.78.1)));// Print the data
stream1.print(); // The default parallelism number of CPU cores
/ / execution
env.execute("source test"); }}Copy the code
Execute look wait a few seconds later
You can see the order is out of order
Read data from a file
So this is pretty simple core code
DataStreamSource stream2 = env.readTextFile("");
Copy the code
Source data from the Kafka message queue (emphasis)
Introduce the Kafka connector dependency
<groupId>org.apache.flink</groupId>
<artifactId>Flink connector - kafka - 0.11 _2. 12</artifactId>
<version>1.10.1</version>
</dependency>
Copy the code
Configure Kafka information
/ / kafka configuration
Properties properties = new Properties();
properties.setProperty("bootstrap.servers"."192.168.216.111:9092192168 216.112:9092192168 216.113:9092");
properties.setProperty("group.id"."flink-kafka");
properties.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset"."latest");
Copy the code
Read the data
// Read data from a Kafka topic
DataStreamSource<String> stream3 = env.addSource(new FlinkKafkaConsumer011<String>(
"sensor".new SimpleStringSchema(),
properties
));
stream3.print();
Copy the code
The addSource() function in Flink provides a method to import data sources, which can use flink’s already defined Source(such as kafka above), of course, also can customize the Source
Custom Source
In addition to the above source data sources, we can also customize the source. All you need to do is pass in a SourceFunction. The specific invocation is as follows:
DataStreamSource<String> stream4 = env.addSource(new new CustomSource())
Copy the code
The data sources that can randomly generate sensor data are defined below
public class SourceFromCustom {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new CustomSource());
inputDataStream.print();
env.execute();
}
// The rich function inherits from SourceFunction
public static class CustomSource implements SourceFunction<SensorReading> {
boolean running = true;
@Override
public void run(SourceContext<SensorReading> sourceContext) throws Exception {
Random random = new Random();
// Define a wireless loop that continuously generates data unless canceled
while (running) {
// Every 100 seconds
for (int i = 0; i < 5; i++) {
String id = UUID.randomUUID().toString().substring(0.8);
long timestamp = System.currentTimeMillis(); // Get milliseconds
double temperature = 60 + random.nextGaussian() * 20; NextGaussian () : returns the next pseudo-random Gaussian double with a mean of 0.0 and standard deviation of 1.0
sourceContext.collect(new SensorReading(id, timestamp, temperature));
Thread.sleep(100L);
}
Thread.sleep(1000L); }}@Override
public void cancel(a) {
running = false; }}}Copy the code
The result is as follows
This article is reproduced on my personal blog. The source of the Flink stream processing API is copyrightable under CC 4.0 by-SA
Welcome to exchange and study
Personal blog
CSDN home page