Custom source

Just pass in a SourceFunction

val stream4 = env.addSource( new MySensorSource() )

Copy the code

Example: Generate sensor data randomly


Sensor data is simply assembled by generating random data


Transform

  • Conversion operator

val streamMap = stream.map { x => x * 2 }

Copy the code
  • flatMap
A,

FlatMap (List (1, 2, 3)) (I - > List (I, I)), the result is a List (1,1,2,2,3,3)



B,

List("a b"."c d").flatMap(line -> line.split(""List(a,b,c,c)

Copy the code
Code:



val streamFlatMap = stream.flatMap{

 x => x.split("")

}



Copy the code
  • Filter

val streamFilter = stream.filter{

 x => x == 1

}

Copy the code
  • KeyBy

DataStream and KeyedStream logically split a stream into disjoint partitions



Each partition contains elements with the same key internallyhashFormal realization of

Copy the code
Code:



val aggStream = dataStream.keyBy("id")

Copy the code
  • Rolling Aggregation
Do aggregation for each tributary of the keyedStream



sum()

min()

max()

minBy()

maxBy()

Copy the code
  • Reduce

The source code

https://gitee.com/pingfanrenbiji/Flink-UserBehaviorAnalysis/blob/master/FlinkTutorial/src/main/scala/com/xdl/apitest/Tra nsformTest.scala

Copy the code
KeyedStream and DataStream aggregate a packet DataStream



Merging the current element with the result of the last aggregation produces a new value



The flow returned contains the results of each aggregation



Instead of just returning the final result of the last aggregation

Copy the code

stream split
stream select
  • ConnectedStreams
Connect two streams that maintain their type. The two streams that are connected are simply placed in the same stream while keeping their data and form unchanged internally. The two streams are independent of each other

Copy the code

  • CoMap,CoFlatMap
ConnectedStream works the same as map and flatMap

Each stream in ConnectedStreams is processed with a map and flatMap respectively

Copy the code

  • Union
A union operation on two or more DataStream results in a new DataStream that contains all DataStram elements

Copy the code

Connect and union

1. The first two streams must have the same type of union. Connect can be different and then adjusted to be the same in coMap



Connect can only handle two streams. Union can handle multiple streams

Copy the code

Supported data types

The Flink stream application deals with a stream of events represented by data objects



Data objects need to be serialized and deserialized



To transport them over the network or read them from state backends, checkpoints, and savepoints



Copy the code
Flink uses the concept of type information to represent data types



Specific serializers, deserializers, and comparators are generated for each data type

Copy the code
Flink has a type extraction system that analyzes the input and return types of functions



To automatically get type information to get serialization and deserializers

Copy the code
Lamdba functions or generic types need to display type information to make the application work or improve performance

Copy the code

Underlying data types

Flink supports data types that are common in Java and Scala

Copy the code
  • Long
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)



numbers.map( n => n + 1 )

Copy the code
  • tuples
val persons: DataStream[(String, Integer)] = env.fromElements(

 ("Adam", 17),

 ("Sarah"23)),

  

persons.filter(p => p._2 > 18)

Copy the code
  • The sample class
case class Person(name: String, age: Int)



val persons: DataStream[Person] = env.fromElements(

    Person("Adam", 17),

    Person("Sarah"23)),

    

persons.filter(p => p.age > 18)

Copy the code
  • Simple class
public class Person {

    public String name;

    public int age;

    

    public Person() {}

    

    public Person(String name, int age) {

    this.name = name;

    this.age = age;

    }

}



DataStream<Person> persons = env.fromElements(

    new Person("Alex", 42).

    new Person("Wendy", 23));

Copy the code
  • Flink also supports Java and Scala(ArrayList, HashMap, Enum, etc.)

Implement UDF functions – more granular control flow

Function classes

Flink exposes the interfaces of all UDF functions (implemented as interfaces or abstract classes)



Such as:



MapFunction, FilterFunction, ProcessFunction

Copy the code
  • Implement the FilterFunction interface
class FilterFilter extends FilterFunction[String] {

    override def filter(value: String): Boolean = {

        value.contains("flink")

        }

    }



val flinkTweets = tweets.filter(new FlinkFilter)

Copy the code
  • Implement functions as anonymous classes
val flinkTweets = tweets.filter(

    new RichFilterFunction[String] {

        override def filter(value: String): Boolean = {

        value.contains("flink")

        }

    }

)

Copy the code
  • The string is passed in as an argument
val tweets: DataStream[String] = ...



val flinkTweets = tweets.filter(new KeywordFilter("flink"))



class KeywordFilter(keyWord: String) extends FilterFunction[String] {

    override def filter(value: String): Boolean = {

        value.contains(keyWord)

    }

}

Copy the code

Anonymous (Lamdba Functions)

val tweets: DataStream[String] = ...



val flinkTweets = tweets.filter(_.contains("flink"))

Copy the code
// _. Id Indicates data => data.id



dataStream.filter( _.id.statWith("sensor_1")).print

Copy the code

Vice function

Function class interface All Flink function classes are available in Rich versions



Different from regular functions:



You can get the runtime context

And have some lifecycle methods

More complex functions can be achieved



A, RichMapFunction



B, RichFlatMapFunction



C, RichFilterFunction



Life cycle:



A, Open () is the Rich function initialization method



Open is called before an operator such as map or filter is called



B. The close method is the last one called in the lifecycle to do some cleanup



The c, getRuntimeContext methods provide some information about the function's RuntimeContext



Examples include the execution parallelism of a function, the name of the task, and state



Copy the code
  • code
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {

    var subTaskIndex = 0

    

    override def open(configuration: Configuration): Unit = {

        subTaskIndex = getRuntimeContext.getIndexOfThisSubtask

// You can do some initialization work, such as establishing a connection to HDFS

    }

    

    override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {

        if (in % 2 == subTaskIndex) {

         out.collect((subTaskIndex, in))

        }

    }

    override def close(): Unit = {

// Perform some cleaning tasks, such as disconnect from HDFS.

    }

}

Copy the code