Custom source
Just pass in a SourceFunction
val stream4 = env.addSource( new MySensorSource() )
Copy the code
Example: Generate sensor data randomly
Sensor data is simply assembled by generating random data
Transform
- Conversion operator
val streamMap = stream.map { x => x * 2 }
Copy the code
- flatMap
A,
FlatMap (List (1, 2, 3)) (I - > List (I, I)), the result is a List (1,1,2,2,3,3)
B,
List("a b"."c d").flatMap(line -> line.split(""List(a,b,c,c)
Copy the code
Code:
val streamFlatMap = stream.flatMap{
x => x.split("")
}
Copy the code
- Filter
val streamFilter = stream.filter{
x => x == 1
}
Copy the code
- KeyBy
DataStream and KeyedStream logically split a stream into disjoint partitions
Each partition contains elements with the same key internallyhashFormal realization of
Copy the code
Code:
val aggStream = dataStream.keyBy("id")
Copy the code
- Rolling Aggregation
Do aggregation for each tributary of the keyedStream
sum()
min()
max()
minBy()
maxBy()
Copy the code
- Reduce
The source code
https://gitee.com/pingfanrenbiji/Flink-UserBehaviorAnalysis/blob/master/FlinkTutorial/src/main/scala/com/xdl/apitest/Tra nsformTest.scala
Copy the code
KeyedStream and DataStream aggregate a packet DataStream
Merging the current element with the result of the last aggregation produces a new value
The flow returned contains the results of each aggregation
Instead of just returning the final result of the last aggregation
Copy the code
- ConnectedStreams
Connect two streams that maintain their type. The two streams that are connected are simply placed in the same stream while keeping their data and form unchanged internally. The two streams are independent of each other
Copy the code
- CoMap,CoFlatMap
ConnectedStream works the same as map and flatMap
Each stream in ConnectedStreams is processed with a map and flatMap respectively
Copy the code
- Union
A union operation on two or more DataStream results in a new DataStream that contains all DataStram elements
Copy the code
Connect and union
1. The first two streams must have the same type of union. Connect can be different and then adjusted to be the same in coMap
Connect can only handle two streams. Union can handle multiple streams
Copy the code
Supported data types
The Flink stream application deals with a stream of events represented by data objects
Data objects need to be serialized and deserialized
To transport them over the network or read them from state backends, checkpoints, and savepoints
Copy the code
Flink uses the concept of type information to represent data types
Specific serializers, deserializers, and comparators are generated for each data type
Copy the code
Flink has a type extraction system that analyzes the input and return types of functions
To automatically get type information to get serialization and deserializers
Copy the code
Lamdba functions or generic types need to display type information to make the application work or improve performance
Copy the code
Underlying data types
Flink supports data types that are common in Java and Scala
Copy the code
- Long
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )
Copy the code
- tuples
val persons: DataStream[(String, Integer)] = env.fromElements(
("Adam", 17),
("Sarah"23)),
persons.filter(p => p._2 > 18)
Copy the code
- The sample class
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah"23)),
persons.filter(p => p.age > 18)
Copy the code
- Simple class
public class Person {
public String name;
public int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42).
new Person("Wendy", 23));
Copy the code
- Flink also supports Java and Scala(ArrayList, HashMap, Enum, etc.)
Implement UDF functions – more granular control flow
Function classes
Flink exposes the interfaces of all UDF functions (implemented as interfaces or abstract classes)
Such as:
MapFunction, FilterFunction, ProcessFunction
Copy the code
- Implement the FilterFunction interface
class FilterFilter extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
val flinkTweets = tweets.filter(new FlinkFilter)
Copy the code
- Implement functions as anonymous classes
val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
)
Copy the code
- The string is passed in as an argument
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(new KeywordFilter("flink"))
class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord)
}
}
Copy the code
Anonymous (Lamdba Functions)
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(_.contains("flink"))
Copy the code
// _. Id Indicates data => data.id
dataStream.filter( _.id.statWith("sensor_1")).print
Copy the code
Vice function
Function class interface All Flink function classes are available in Rich versions
Different from regular functions:
You can get the runtime context
And have some lifecycle methods
More complex functions can be achieved
A, RichMapFunction
B, RichFlatMapFunction
C, RichFilterFunction
Life cycle:
A, Open () is the Rich function initialization method
Open is called before an operator such as map or filter is called
B. The close method is the last one called in the lifecycle to do some cleanup
The c, getRuntimeContext methods provide some information about the function's RuntimeContext
Examples include the execution parallelism of a function, the name of the task, and state
Copy the code
- code
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
var subTaskIndex = 0
override def open(configuration: Configuration): Unit = {
subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// You can do some initialization work, such as establishing a connection to HDFS
}
override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
if (in % 2 == subTaskIndex) {
out.collect((subTaskIndex, in))
}
}
override def close(): Unit = {
// Perform some cleaning tasks, such as disconnect from HDFS.
}
}
Copy the code