1. Record that the Flink CEP Pattern is not triggered

1.1. Problem description

1: I set up the Watermark to use the event time

2: Then use StreamTableEnvironment to register the View, use SQL to do aggregation calculation

3: Then convert the table to a stream

4: Then define Flink CEP Pattern to match the data flow

5: It turns out that the CEP has not been triggered

The code is as follows:

// Real-time data flow
DataStreamSource<String> dataSource = env.addSource(kafkaConsumer);

// Map data streams to data entities
DataStream<CleanBean> cleanBeanDataStream = dataSource.map(new CleanBeanMap());

// Set the water level
DataStream<CleanBean> waterData = cleanBeanDataStream
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.
                        <CleanBean>forBoundedOutOfOrderness(Duration.ofSeconds(/*Long.valueOf(ProjectConfig.config.getProperty("delay.time"))*/0))
                        .withTimestampAssigner((cleanBean, l) -> (cleanBean.getEventTime()))
        );

// Register (stream)
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.createTemporaryView("TableDemo", mapData, $("tableName") and $("fDouble") and $("fLong") and $("fString") and $("fBoolean") and $("eventTime").rowtime());

// Execute the query
Table table = tEnv.sqlQuery("select tableName,avg(fDouble) as temperature from TableDemo group by tableName, TUMBLE(eventTime,INTERVAL '2' SECOND)");

/ / table
DataStream<WarnTemperatureBean> streamData = tEnv.toAppendStream(table, WarnTemperatureBean.class);

// Define pattern matching rules
Pattern<WarnTemperatureBean, WarnTemperatureBean> pattern = Pattern.<WarnTemperatureBean>begin("start").where(new SimpleCondition<WarnTemperatureBean>() {
            @Override
            public boolean filter(WarnTemperatureBean value) throws Exception {
                return true; }});// Apply pattern matching to the data stream
CEP.pattern(streamData.keyBy(WarnTemperatureBean::getTableName), pattern)
                .select(new PatternSelectFunction<WarnTemperatureBean, Object>() {
                    @Override
                    public Object select(Map<String, List<WarnTemperatureBean>> map) throws Exception {
                        List<WarnTemperatureBean> next = map.get("next");
                        if(next ! =null && next.size() > 0) {
                            // Send an alarm email
                            System.out.println("Alarm Data:"+next);
                        }
                        return next;
                    }
                }).print("=========== Rule matching extraction data ===========");

Copy the code

1.2. Solutions

The water level needs to be reset. CEP is triggered when the water level is reset.

At present, it is suspected that the water line will be lost when Table is obtained after Flink SQL processing and then converted into Stream.

Gentlemen have their own views welcome to discuss.