This is the 20th day of my participation in the November Gwen Challenge. Check out the details: The last Gwen Challenge 2021
An overview,
Up to 1.10, the DataSet and DataStream apis are still used to adapt to different application scenarios.
Apache Flink was born with a design philosophy that supported multiple forms of computing, including batch processing, streaming, and machine learning, all in one engine. Especially for streaming computing, Flink implements streaming batch integration at the computing engine level.
DataSet
The core class inflink-java
This moduleDataStream
The core implementation class of theflink-streaming-java
This module
Both support rich and similar apis, such as common transformation functions such as map, filter, and Join.
In Flink’s programming model:
DataSet
,Source
Partially from files, tables, orJava
A collection ofDataStream
的Source
The part is usually message-oriented middleware such asKafka
等
DataStream is used as an example:
The basic building blocks of the Flink program are Streams and Transformations, each data stream starting from one or more sources and ending at one or more sinks. The data flow is similar to directed acyclic graphs (DAG).
Third, operator
(1) Customize real-time data sources
Use the custom Sourceb function provided by Flink to implement a custom real-time data source
public class MyStreamingSource implements SourceFunction<MyStreamingSource.Item> {
private boolean isRunning = true;
/** * Rewrite the run method to produce a continuous source of data to send *@param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Item> ctx) throws Exception {
while(isRunning){
Item item = generateItem();
ctx.collect(item);
// Generate one data per second
Thread.sleep(1000); }}@Override
public void cancel(a) {
isRunning = false;
}
// Generate a random item data
private Item generateItem(a){
int i = new Random().nextInt(100);
Item item = new Item();
item.setName("name" + i);
item.setId(i);
return item;
}
class Item{
private String name;
private Integer id;
Item() {
}
public String getName(a) {
return name;
}
void setName(String name) {
this.name = name;
}
private Integer getId(a) {
return id;
}
void setId(Integer id) {
this.id = id;
}
@Override
public String toString(a) {
return "Item{" +
"name='" + name + '\' ' +
", id=" + id +
'} '; }}}class StreamingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Get the data source
DataStreamSource<MyStreamingSource.Item> text =
// Note: Parallelism is set to 1, which we will cover in more detail in a later lesson
env.addSource(new MyStreamingSource()).setParallelism(1);
DataStream<MyStreamingSource.Item> item = text.map(
(MapFunction<MyStreamingSource.Item, MyStreamingSource.Item>) value -> value);
// Print the result
item.print().setParallelism(1);
String jobName = "user defined streaming source"; env.execute(jobName); }}Copy the code
(2)Map
The Map takes an element as input and outputs it according to the developer’s custom logic.
class StreamingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Get the data source
DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1);
//Map
SingleOutputStreamOperator<Object> mapItems = items.map(new MapFunction<MyStreamingSource.Item, Object>() {
@Override
public Object map(MyStreamingSource.Item item) throws Exception {
returnitem.getName(); }});// Print the result
mapItems.print().setParallelism(1);
String jobName = "user defined streaming source"; env.execute(jobName); }}Copy the code
You can also define your own Map
class StreamingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Get the data source
DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1);
SingleOutputStreamOperator<String> mapItems = items.map(new MyMapFunction());
// Print the result
mapItems.print().setParallelism(1);
String jobName = "user defined streaming source";
env.execute(jobName);
}
static class MyMapFunction extends RichMapFunction<MyStreamingSource.Item.String> {
@Override
public String map(MyStreamingSource.Item item) throws Exception {
returnitem.getName(); }}}Copy the code
(3)FlatMap
FlatMap takes one element and returns zero to multiple elements. FlatMap is somewhat similar to Map, but when the return value is a list, FlatMap “tils” the list, that is, outputs it as a single element.
SingleOutputStreamOperator<Object> flatMapItems = items.flatMap(new FlatMapFunction<MyStreamingSource.Item, Object>() {
@Override
public void flatMap(MyStreamingSource.Item item, Collector<Object> collector) throws Exception { String name = item.getName(); collector.collect(name); }});Copy the code
(4)Filter
Fliter means to filter out unwanted data. Each element is processed by the filter function. If the filter function returns true, it is retained, otherwise it is discarded.
SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter(new FilterFunction<MyStreamingSource.Item>() {
@Override
public boolean filter(MyStreamingSource.Item item) throws Exception {
return item.getId() % 2= =0; }});Copy the code
(5)KeyBy
It is often necessary to group data based on an attribute or just a field, and then treat different groups differently.
// The received data is split, grouped, windowed, and aggregated
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds
....
Copy the code
(6)Aggregations
Aggregations are general Aggregations. Common Aggregations include but are not limited to sum, Max, and min.
Try to avoid using Aggregations on an infinite stream
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
Copy the code
Here’s an example:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Get the data source
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0.1.0));
data.add(new Tuple3<>(0.1.1));
data.add(new Tuple3<>(0.2.2));
data.add(new Tuple3<>(0.1.3));
data.add(new Tuple3<>(1.2.5));
data.add(new Tuple3<>(1.2.9));
data.add(new Tuple3<>(1.2.11));
data.add(new Tuple3<>(1.2.13));
DataStreamSource<MyStreamingSource.Item> items = env.fromCollection(data);
items.keyBy(0).max(2).printToErr();
// Print the result
String jobName = "user defined streaming source";
env.execute(jobName);
Copy the code
(7)Reduce
The Reduce function takes effect on each keyedStream that is grouped and aggregates groups according to user-defined aggregation logic.
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0.1.0));
data.add(new Tuple3<>(0.1.1));
data.add(new Tuple3<>(0.2.2));
data.add(new Tuple3<>(0.1.3));
data.add(new Tuple3<>(1.2.5));
data.add(new Tuple3<>(1.2.9));
data.add(new Tuple3<>(1.2.11));
data.add(new Tuple3<>(1.2.13));
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
//items.keyBy(0).max(2).printToErr();
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> reduce = items.keyBy(0).reduce(new ReduceFunction<Tuple3<Integer, Integer, Integer>>() {
@Override
public Tuple3<Integer,Integer,Integer> reduce(Tuple3<Integer, Integer, Integer> t1, Tuple3<Integer, Integer, Integer> t2) throws Exception {
Tuple3<Integer,Integer,Integer> newTuple = new Tuple3<>();
newTuple.setFields(0.0,(Integer)t1.getField(2) + (Integer) t2.getField(2));
returnnewTuple; }}); reduce.printToErr().setParallelism(1);
Copy the code