Please follow my personal blog to learn more
transform
Function: Convert the Soure data to the required data
Commonly used functions
map
The map operator is similar to python’s map operator, which translates data into data in a lambda expression. The Map operator in Flink is more general, specifying the transformation process with a new map() method. The format for converting one data type (input) to another data type (output) is as follows
dataStream.map(new Mapfunction<input,output>(){
@Override
map(input){xxx};
})
Copy the code
You can see it better by drawing it
The rectangle becomes an ellipse, but the color doesn’t change (the logic doesn’t change).
flatMap
FlatMap Flattening operator: Flatmap outputs multiple output types with an example string “hello,word” in the form of Tuple2(” hello “,1) and Tuple2(” word “,1)
inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(",");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1)); }}});Copy the code
filter
The filter operator filters the input data of the input type. True is left and false is filtered out as shown below
Keyby (grouping)
DataStream → KeyedStream: Logically splits a stream into disjointed partitions, each containing elements with the same key, implemented internally as a hash.
Use dataStream. Keyby (param)
Param: Data field subscripts start from 0 by default. You can also enter fields with IDS
Rolling Aggregation
- The sum, sum
- Max: Select the maximum value of each stream
- Min: select the minimum value of each stream
- Minby: Selects the minimum value for a field data in keyedStream
- Maxby: Selects the maximum value for a field in keyedStream
Reduce (complex aggregation)
KeyedStream → DataStream: Merges the current element with the result of the last aggregation, producing a new value, and returns a stream containing the result of each aggregation, rather than just the final result of the last aggregation. Example: Compare the temperature of the last timestamp with the data from the sensor ID, and select the timestamp of the maximum temperature
import com.chengyuyang.apitest.SensorReading;
import com.chengyuyang.apitest.SourceFromCustom;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Transform_keyed_Reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// The data source is the custom generated sensor data
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
// Select the timestamp of the maximum temperature by comparing the temperature of the last timestamp with the data from the sensor ID
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.reduce(new CustomReduceFunction());
resultDataStream.print();
env.execute();
}
public static class CustomReduceFunction implements ReduceFunction<SensorReading> {
@Override
public SensorReading reduce(SensorReading sensorReading, SensorReading input) throws Exception {
String id = sensorReading.getId();
Long timestamp = input.getTimestamp();
// Select the maximum temperature according to the timestamp
double temperature = Math.max(sensorReading.getTemperature(), input.getTemperature());
return newSensorReading(id, timestamp, temperature); }}}Copy the code
Split and select
split
DataStream → SplitStream: Split a DataStream into two or more DataStream based on certain characteristics
select
SplitStream→DataStream: Obtain one or more DataStream from a SplitStream
Here is the following
case
According to the temperature of the sensor, with 60 degrees as the standard, greater than or equal to 60 degrees for high flow, other as low flow
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.chengyuyang.apitest.SensorReading;
import com.chengyuyang.apitest.SourceFromCustom;
public class Transform_Split_Select {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDatStream = env.addSource(new SourceFromCustom.CustomSource());
// Split by temperature 60 standard split by temperature 60 standard
SplitStream<SensorReading> splitStream = inputDatStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading sensorReading) {
Double temperature = sensorReading.getTemperature();
if (temperature >= 60) {
return Lists.newArrayList("high");
} else {
return Lists.newArrayList("low"); }}});//SplitStream→DataStream; //SplitStream→DataStream
DataStream<SensorReading> high = splitStream.select("high");
DataStream<SensorReading> low = splitStream.select("low");
DataStream<SensorReading> all = splitStream.select("high"."low");
high.print("high").setParallelism(1);
low.print("low").setParallelism(1);
all.print("all").setParallelism(1); env.execute(); }}Copy the code
The results are as follows
Connect and comap will
Connect (One Country, Two Systems)
DataStream,DataStream → ConnectedStreams: Connect two streams as ConnectedStreams, but keep their various data types unchanged. The two streams are independent of each other, and the input data types can be the same or different
Here is the following
comap coflatmap
ConnectedStreams → DataStream function is the same as map and flatMap, but because the two connect streams have different data types, they are processed by map and flatMap, and the final result can be different
case
Output normal temperature data for high temperature plus warning label according to split and SELECT cases
Part of the code
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectDataStream = highDataStream.connect(lowDataStream);
SingleOutputStreamOperator<Object> resultDataStream = connectDataStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> input) throws Exception {
// Processing high temperature data
return new Tuple3<>(input.f0, input.f1, "warnning");
}
@Override
public Object map2(SensorReading input) throws Exception {
// Process normal temperature data
return newTuple3<>(input.getId(), input.getTimestamp(), input.getTemperature()); }});Copy the code
union
DataStream, DataStreamTwo or more DataStream are union operated to produce a new DataStream containing all DataStream elements.Note: If you union a DataStream with itself, you will see each element appear twice in the new DataStreamHere is the following
Connect is different from Union
- The two flows before Union must be of the same type; Connect can be different
- Connect can be adjusted to be the same or different in later coMap
- Connect can operate only two streams, and Union can operate more than one
case
Union is performed according to split and select cases, and normal temperature data is output for high temperature with warning label
Part of the code
DataStream<SensorReading> unionDataStream = high.union(low);
SingleOutputStreamOperator<Tuple3<String, Long, Object>> resultDataStream = unionDataStream.map(new MapFunction<SensorReading, Tuple3<String, Long, Object>>() {
@Override
public Tuple3<String, Long, Object> map(SensorReading input) throws Exception {
if (input.getTemperature() >= 60) {
return new Tuple3<String, Long, Object>(input.getId(), input.getTimestamp(), "warnning");
} else {
return newTuple3<String, Long, Object>(input.getId(), input.getTimestamp(), input.getTemperature()); }}});Copy the code
This article is reproduced on my personal blog. The Transform operator of Flink stream processing API is copyrighting CC 4.0 BY SA
Welcome to exchange and study
Personal blog
CSDN home page