When we watch live broadcasts, whether for anchors or users, a very important item is the barrage culture. In order to increase the interest and interactivity of live broadcast, various online live broadcast platforms have adopted pop-up bullet screen as a real-time communication method for users. The rich content and various forms of bullet screen data imply complex user attributes and user behaviors. The research and understanding of online live broadcast platform users has the application value in many aspects, such as the review and monitoring of bullet screen content, the prediction of public opinion hot spot, and personalized summary annotation.
In this paper, the application value of barrage data is not analyzed, but the concept and functions of Flink CEP are understood through the case of review and monitoring of barrage content.
When a user sends out a barrage, the live broadcasting platform mainly monitors and identifies two types of barrage content in real time: one is the user who posts an unfriendly barrage, and the other is the user who floods the screen.
Keeping in mind the above two types of users that need to be identified by real-time monitoring, let’s introduce the Flink CEP API and then use CEP to address these issues.
This article is published in the public number [5 minutes to Learn Big Data], the original technology number in the field of big data
Flink CEP
What is Flink CEP
Flink CEP is a Flink-based complex event processing library that detects complex events from multiple data streams, identifies meaningful events (such as opportunities or threats), and responds as quickly as possible, rather than waiting days or months to find problems.
Flink CEP API
At the heart of the CEP API is the Pattern API, which allows you to quickly define complex event patterns. Each pattern contains several stages, or what we might call states. By switching from one state to another, the user can specify conditions that can be applied to adjacent or independent events.
Before introducing the API, let’s understand a few concepts:
1. Patterns and pattern sequences
-
A simple pattern is called a pattern, and a sequence of complex patterns that is ultimately searched for a match in the data stream is called a pattern sequence. Each complex pattern sequence is composed of multiple simple patterns.
-
Each pattern must have a unique name, which we can use to identify the events that the pattern matches.
2. Single mode
A pattern can be either singletons or loops. The singleton pattern accepts a single event, and the loop pattern accepts multiple events.
3. Pattern example:
The modes are as follows: A B + C? d
The letters A, B, C, and D are patterns, the + is cycles, and the b+ is cycles; ? C? Is the optional mode;
A can be followed by one or more B’s, followed optionally by C’s, and finally by D’s.
A, C? , d is singleton mode, b+ is circular mode.
In general, patterns are singletons that can be converted to circular patterns using Quantifiers.
Each pattern can have one or more conditions that are defined based on event reception. In other words, each pattern matches and receives events through one or more conditions.
With these concepts in mind, here are some OF the CEP apis that will be used in this case:
The CEP API used in this case:
-
Begin: Defines a start mode state
Use: start = Pattern.
begin(“start”);
-
Next: Append a new mode state. The matching event must be directly connected to the previous matching event
Usage: next = start.next(“next”);
-
Where: Defines the filter conditions for the current mode status. The event matches the state only if it passes through the filter
Patternstate. where(_. Message == “yyds”);
-
Within: Defines the maximum interval between an event sequence and a pattern match. If an unfinished sequence of events exceeds this time, it is discarded
Usage: patternState. Within (Time. Seconds (10));
-
Times: a specified number of Times an event of a given type occurs
Usage: patternState. Times (5);
API first, let’s tackle the example mentioned at the beginning of this article:
Monitor user barrage behavior cases
Case 1: Detecting malicious users
Rule: If a user enters TMD more than 5 times within 10 seconds, the user is considered as a malicious attack and identified.
Using Flink CEP to detect malicious users:
import org.apache.flink.api.scala._
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP.PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream.OutputTag.StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
object BarrageBehavior01 {
case class LoginEvent(userId:String, message:String, timestamp:Long){
override def toString: String = userId
}
def main(args: Array[String) :Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Use IngestionTime as EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// To observe the sequence of test data processing
env.setParallelism(1)
// Simulate the data source
val loginEventStream: DataStream[LoginEvent] = env.fromCollection(
List(
LoginEvent("1"."TMD".1618498576),
LoginEvent("1"."TMD".1618498577),
LoginEvent("1"."TMD".1618498579),
LoginEvent("1"."TMD".1618498582),
LoginEvent("2"."TMD".1618498583),
LoginEvent("1"."TMD".1618498585)
)
).assignAscendingTimestamps(_.timestamp * 1000)
// Define the schema
val loginEventPattern: Pattern[LoginEvent.LoginEvent] = Pattern.begin[LoginEvent] ("begin")
.where(_.message == "TMD")
.times(5)
.within(Time.seconds(10))
// Matching pattern
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginEventPattern)
import scala.collection.Map
val result = patternStream.select((pattern:Map[String.可迭代[LoginEvent]]) = > {val first = pattern.getOrElse("begin".null).iterator.next()
(first.userId, first.timestamp)
})
// For malicious users, the actual processing can be carried out according to the user, such as gag processing, in order to simplify the print here only the user
result.print(Malicious User >>>)
env.execute("BarrageBehavior01")}}Copy the code
Case 2: Monitoring swiping users
Rule: If a user enters the same sentence more than five times within 10 seconds, it is considered as a malicious swipe.
Use Flink CEP to detect swiping users
object BarrageBehavior02 {
case class Message(userId: String, ip: String, msg: String)
def main(args: Array[String) :Unit = {
// Initialize the runtime environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Set parallelism
env.setParallelism(1)
// Simulate the data source
val loginEventStream: DataStream[Message] = env.fromCollection(
List(
Message("1"."192.168.0.1"."beijing"),
Message("1"."192.168.0.2"."beijing"),
Message("1"."192.168.0.3"."beijing"),
Message("1"."192.168.0.4"."beijing"),
Message("2"."192.168.10.10"."shanghai"),
Message("3"."192.168.10.10"."beijing"),
Message("3"."192.168.10.11"."beijing"),
Message("4"."192.168.10.10"."beijing"),
Message("5"."192.168.10.11"."shanghai"),
Message("4"."192.168.10.12"."beijing"),
Message("5"."192.168.10.13"."shanghai"),
Message("5"."192.168.10.14"."shanghai"),
Message("5"."192.168.10.15"."beijing"),
Message("6"."192.168.10.16"."beijing"),
Message("6"."192.168.10.17"."beijing"),
Message("6"."192.168.10.18"."beijing"),
Message("5"."192.168.10.18"."shanghai"),
Message("6"."192.168.10.19"."beijing"),
Message("6"."192.168.10.19"."beijing"),
Message("5"."192.168.10.18"."shanghai")))// Define the schema
val loginbeijingPattern = Pattern.begin[Message] ("start") .where(_.msg ! =null) // A login failed
.times(5).optional // Five data pairs will be printed
.within(Time.seconds(10))
// Perform group matching
val loginbeijingDataPattern = CEP.pattern(loginEventStream.keyBy(_.userId), loginbeijingPattern)
// Find the data that matches the rule
val loginbeijingResult: DataStream[Option[可迭代[Message]]] = loginbeijingDataPattern.select(patternSelectFun = (pattern: collection.Map[String.可迭代[Message]]) = > {var loginEventList: Option[可迭代[Message]] = null
loginEventList = pattern.get("start") match {
case Some(value) => {
if (value.toList.map(x => (x.userId, x.msg)).distinct.size == 1) {
Some(value)
} else {
None
}
}
}
loginEventList
})
// Print the testloginbeijingResult.filter(x=>x! =None).map(x=>{
x match {
case Some(value)=> value
}
}).print()
env.execute("BarrageBehavior02) } }Copy the code
Flink CEP API
In addition to the apis described in the example, we will introduce other common apis:
1. The conditions of the API
In order for an incoming event to be accepted by the pattern, the pattern is given the conditions that the incoming event must meet, which are set by the properties of the event itself or the property statistics of previously matched events. For example, a value of an event is greater than 5, or greater than the average value of a previously accepted event.
Conditions can be specified using the pattern.where(), pattern.or(), or pattern.until() methods. Conditions can be either IterativeConditions or SimpleConditions.
FlinkCEP supports three kinds of proximity conditions between events:
-
Next () : Strictly satisfies the condition
Example: The pattern is begin(“first”).WHERE (_.name=’a’).next(“second”).WHERE (.name=’b’). The pattern is matched only when the data is a and b. If the data is a, C,b, and a is followed by c, then a will be discarded and the pattern will not be hit.
-
FollowedBy () : Loosely satisfies the condition
Example: The pattern is begin(“first”). Where (_.name=’a’).followedby (“second”). Where (.name=’b’) If and only if the data is a,b or a, C, B, the pattern is matched and the c in the middle is ignored.
-
FollowedByAny () : Undetermined looseness satisfies the condition
Example: Mode for the begin (” first “). The where (_. Name = ‘a’). The followedByAny (” second “). The where (. Name = ‘b’) if and only if the data for a, c, b, b, for followedBy model accuracy for {a, b}, For followedByAny there are two hits {a,b},{a,b}.
2. The quantifiers API
Remember what we said above about the concept of patterns: Patterns are generally singletons that can be transformed into circular patterns using Quantifiers. The quantifier here is the quantifier API.
The following quantifier apis can specify a pattern as a circular pattern:
-
Pattern.oneormore () : oneOrMore occurrences of a given event, such as the b+ mentioned above.
-
Pattern. Times (#ofTimes) : an event of a given type occurs a specified number ofTimes, for example, 4 times.
-
Pattern. Times (#fromTimes, #toTimes) : Events of a given type occur within a specified range of times, such as 2 to 4.
-
You can use the pattern.greedy() method to turn a pattern into a circular pattern, but you cannot have a set of patterns all become circular. Greedy: Repeat as much as possible.
-
Use the pattern.optional() method to make the circular pattern optional, either as a circular pattern or as a single pattern.
3. The matched skip policy
The so-called matching skip strategy is to filter multiple patterns that have been successfully matched. In other words, if there are multiple matches, maybe I don’t need so many, just filter them out, according to the matching policy.
There are five skip strategies in Flink:
-
NO_SKIP: No filtering, all possible matches are issued.
-
SKIP_TO_NEXT: discards the events that are the same as the events that are matched at the beginning, and sends the events that are matched at the beginning, that is, jumps directly to the events that are matched at the next pattern, and so on.
-
SKIP_PAST_LAST_EVENT: Dismisses events that were matched after the match started but before the match ended.
-
SKIP_TO_FIRST[PatternName]: Discards events that are matched after the matching has started but before the first event that the PatternName pattern matches.
-
SKIP_TO_LAST[PatternName]: Dismisses events that were matched after the matching began but before the last event that the PatternName pattern matched.
NO_SKIP () and SKIP_PAST_LAST_EVENT () are used as examples.
Begin (“start”).where(_.name=’a’).oneormore ().followedby (“second”).where(_.name=’b’) A, a, a, a, b, if is NO_SKIP strategy, namely not filtering strategy, pattern matching is: {a, b}, {a, a, b}, {a, a, a, b}, {a, a, a, a, b}; If the SKIP_PAST_LAST_EVENT policy is used, events that are matched after the start of the match but before the end of the match are discarded. The pattern matches {a,a,a,a,b}.
Use scenarios of Flink CEP
In addition to the above case scenarios, Flink CEP is also widely used in network fraud, fault detection, risk avoidance, intelligent marketing and other fields.
1. Real-time anti-cheating and risk control
For e-commerce, wool party is essential, domestic Pinduoduoda has exposed 100 yuan of no-threshold coupon, that night was mattress tens of billions of yuan, for this situation must not do a good job of timely risk control. In addition, when the merchant shelves the goods by frequently modifying the name of the goods and abuse the title to improve the ranking of search keywords, batch registration of a batch of machine account quick brush to improve the sales of goods and other cheating, a variety of cheating methods also need to constantly make rules to match this behavior.
2. Real-time marketing
Analyze the real-time behavior of users in the mobile APP, calculate the activity cycle of users, and make recommendations to users through the portrait of users. For example, within 1 minute after logging in to the APP, users only browse the products but do not place an order; After browsing a product, the user goes to check other similar products within 3 minutes and compares prices. Whether the user has paid the order within 1 minute after placing the order. If all this data can be put to good use, users can be advised about similar items they have viewed, which can greatly increase the purchase rate.
3. Real-time network attack detection
At present, the Internet security situation is still grim, and network attacks are common and diversified. Here, we take the incoming traffic generated by DDOS (distributed denial of Service) attacks as the judgment basis of the attacks. Potential attacks on the network are detected and warned in real time. Multiple data centers of cloud service vendors periodically report their instantaneous traffic to the monitoring center. If the traffic is within the preset normal range, no operation is performed. If the traffic reported by a data center exceeds the normal threshold for five consecutive times within 10 seconds, a warning event is triggered. If the traffic reported by a data center exceeds the normal threshold for 30 consecutive times within 30 seconds, a major alarm is generated.
The principle of Flink CEP is briefly introduced
Apache Flink uses the NFA model for Efficient Pattern Matching over Event Streams to implement CEP. In this paper, there are also some optimisations mentioned, which we will skip here and just talk about the concept of NFA.
In this paper, NFA, non-determined Finite Automaton, called indefinite Finite state machine, refers to Finite states, but each state may be transformed into multiple states (uncertain).
Nondeterministic finite automatic state machine
Let me introduce two concepts:
-
State: There are three types of states, initial state, intermediate state and final state.
-
The transformations: take/ignore/proceed are the names of the transformations.
In NFA matching rules, it is essentially a process of state transition. The meanings of the three transformations are as follows:
-
Take: Mainly refers to the judgment of conditions. When a piece of data comes to judge, once the condition is satisfied, the current element is obtained, put into the result set, and then the current state is transferred to the next state.
-
Proceed: The current state can be moved to the next state without relying on any events, such as pass-through.
-
Ignore: When a piece of data arrives, you can Ignore the message event, the current state remains unchanged, equivalent to a state of its own.
Characteristics of an NFA: In an NFA, given the current state, there may be multiple next states. The next state can be chosen randomly or in parallel (simultaneously). The input symbol can be null.
The rule engine
Rules engine: Separates business decisions from application code and writes business decisions using predefined semantic modules. Accept data input, interpret business rules, and make business decisions based on business rules.
Using rules engines can reduce application maintenance and scalability costs by reducing the complexity of the components that implement complex business logic.
1. Drools
Drools is an open source rules engine written in Java that is typically used to resolve the separation of business code from business rules. Its built-in Drools Fusion module also provides CEP functionality.
Advantage:
- The function is more perfect, such as system monitoring, operation platform and other functions.
- Rules can be dynamically updated.
Disadvantage:
- Memory is used to implement the time window function, and the time window of a long span cannot be supported.
- Timed touch is not effectively supported (e.g., user touch condition judgment after browsing has occurred for some time).
2. Aviator
Aviator is a high-performance, lightweight Java language expression evaluation engine, mainly used for dynamic evaluation of various expressions.
Advantage:
- Most of the operator operators are supported.
- Support for function calls and custom functions.
- Supports regular expression matching.
- Support for incoming variables and excellent performance.
Disadvantage:
- There are no if else, do while statements, no assignment statements, no bitwise operators.
3. EasyRules
EasyRules is a lightweight rules engine that integrates MVEL and SpEL expressions.
Advantage:
- Lightweight framework, low learning cost.
- Based on pojos.
- Provides useful abstractions and easy applications for defining business engines.
- Support for organizing simple rules into complex rules.
4. Esper
Esper is designed to be a lightweight solution for CEP that can be easily embedded in services to provide CEP functionality.
Advantage:
- Lightweight embeddable development, common CEP features simple and easy to use.
- EPL syntax is similar to SQL and is relatively inexpensive to learn.
Disadvantage:
- Single-machine full memory solutions require consolidation of other distributed and storage.
- Memory is used to implement the time window function, and the time window of a long span cannot be supported.
- Timed touch is not effectively supported (e.g., user touch condition judgment after browsing has occurred for some time).
5. Flink CEP
Flink is a streaming system with high throughput and low latency. Flink CEP is a universal and easy to use real-time streaming event processing scheme.
Advantage:
- It inherits the high throughput characteristic of Flink.
- Events can be stored externally and can be extended over time Windows.
- Timed touch is supported (followedBy + PartternTimeoutFunction).
This article is published on the official account of “Five Minutes to Learn Big Data”.