1. Record that the Flink CEP Pattern is not triggered
1.1. Problem description
1: I set up the Watermark to use the event time
2: Then use StreamTableEnvironment to register the View, use SQL to do aggregation calculation
3: Then convert the table to a stream
4: Then define Flink CEP Pattern to match the data flow
5: It turns out that the CEP has not been triggered
The code is as follows:
// Real-time data flow
DataStreamSource<String> dataSource = env.addSource(kafkaConsumer);
// Map data streams to data entities
DataStream<CleanBean> cleanBeanDataStream = dataSource.map(new CleanBeanMap());
// Set the water level
DataStream<CleanBean> waterData = cleanBeanDataStream
.assignTimestampsAndWatermarks(
WatermarkStrategy.
<CleanBean>forBoundedOutOfOrderness(Duration.ofSeconds(/*Long.valueOf(ProjectConfig.config.getProperty("delay.time"))*/0))
.withTimestampAssigner((cleanBean, l) -> (cleanBean.getEventTime()))
);
// Register (stream)
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.createTemporaryView("TableDemo", mapData, $("tableName") and $("fDouble") and $("fLong") and $("fString") and $("fBoolean") and $("eventTime").rowtime());
// Execute the query
Table table = tEnv.sqlQuery("select tableName,avg(fDouble) as temperature from TableDemo group by tableName, TUMBLE(eventTime,INTERVAL '2' SECOND)");
/ / table
DataStream<WarnTemperatureBean> streamData = tEnv.toAppendStream(table, WarnTemperatureBean.class);
// Define pattern matching rules
Pattern<WarnTemperatureBean, WarnTemperatureBean> pattern = Pattern.<WarnTemperatureBean>begin("start").where(new SimpleCondition<WarnTemperatureBean>() {
@Override
public boolean filter(WarnTemperatureBean value) throws Exception {
return true; }});// Apply pattern matching to the data stream
CEP.pattern(streamData.keyBy(WarnTemperatureBean::getTableName), pattern)
.select(new PatternSelectFunction<WarnTemperatureBean, Object>() {
@Override
public Object select(Map<String, List<WarnTemperatureBean>> map) throws Exception {
List<WarnTemperatureBean> next = map.get("next");
if(next ! =null && next.size() > 0) {
// Send an alarm email
System.out.println("Alarm Data:"+next);
}
return next;
}
}).print("=========== Rule matching extraction data ===========");
Copy the code
1.2. Solutions
The water level needs to be reset. CEP is triggered when the water level is reset.
At present, it is suspected that the water line will be lost when Table is obtained after Flink SQL processing and then converted into Stream.
Gentlemen have their own views welcome to discuss.