Learn Flink CEP sample code

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        List<LoginEvent> loginEvents = new ArrayList<>();
        loginEvents.add(new LoginEvent(1.1."127.0.0.1"."success"));
        loginEvents.add(new LoginEvent(2.2."127.0.0.2"."fail"));
        loginEvents.add(new LoginEvent(3.1."127.0.0.2"."success"));
        loginEvents.add(new LoginEvent(4.2."127.0.0.1"."fail"));
        loginEvents.add(new LoginEvent(5.1."127.0.0.1"."success"));
        loginEvents.add(new LoginEvent(6.3."127.0.0.2"."fail"));
        loginEvents.add(new LoginEvent(7.3."127.0.0.1"."success"));
        loginEvents.add(new LoginEvent(8.4."127.0.0.1"."fail"));
        loginEvents.add(new LoginEvent(9.1."127.0.0.2"."success"));
        loginEvents.add(new LoginEvent(10.4."127.0.0.1"."fail"));
    


        DataStreamSource<LoginEvent> dataStreamSource = environment.fromCollection(loginEvents);

        Pattern<LoginEvent, LoginEvent> pattern = Pattern
                .<LoginEvent>begin("one", AfterMatchSkipStrategy.skipPastLastEvent())
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return "fail".equals(loginEvent.getType());
                    }
                }).timesOrMore(2);


        PatternStream<LoginEvent> patternStream = CEP.pattern(
                dataStreamSource
                        .keyBy(LoginEvent::getUserId),
                pattern);
// patternStream
// .select((PatternSelectFunction
      
       ) map -> {
      ,>
// return map.toString();
// }).print();

        patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
            @Override
            public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
                collector.collect(map.get("one").toString());
            }
        }).print();

        environment.execute("TRUTH_TEST_TRUTH1");

Copy the code

1.Lambda expression problem

At the beginning, I used a lot of Lambda expressions in the Demo, but when I finally executed, the following exception occurred:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.cep.PatternSelectFunction could not be inferred. Support for synthetic interfaces.lambdas.and generic or raw types is limited at this point
	at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java: 1239).at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java: 1263).at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java: 1226).at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java: 789).at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java: 587).at org.apache.flink.cep.PatternStream.select(PatternStream.java: 132).at com.truth.flinkdemo.flink.FlinkCEPKt.main(FlinkCEP.kt: 59)at com.truth.flinkdemo.flink.FlinkCEPKt.main(FlinkCEP.kt)
Copy the code

The input and output parameter types of a function in a lambda expression do not need to be declared because the Java compiler inferences these parameters. Although Flink can automatically extract the result type information from the implementation of method signatures, unfortunately, such functionality is compiled by the Java compiler, which makes it impossible for Flink to automatically infer the type information for the output type. According to the official website specific solutions are as follows:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;

// Use Flink's "returns(...) "Method to represent a specific type
env.fromElements(1.2.3)
    .map(i -> Tuple2.of(i, i))
    .returns(Types.TUPLE(Types.INT, Types.INT))
    .print();

// Write the implementation class of the corresponding interface
env.fromElements(1.2.3)
    .map(new MyTuple2Mapper())
    .print();

public static class MyTuple2Mapper extends MapFunction<Integer.Tuple2<Integer.Integer>> {
    @Override
    public Tuple2<Integer, Integer> map(Integer i) {
        returnTuple2.of(i, i); }}// Use anonymous inner classes (personally preferred, as in the sample code)
env.fromElements(1.2.3)
    .map(new MapFunction<Integer, Tuple2<Integer, Integer>> {
        @Override
        public Tuple2<Integer, Integer> map(Integer i) {
            return Tuple2.of(i, i);
        }
    })
    .print();

// Use a tuple subclass instead in this example
env.fromElements(1.2.3)
    .map(i -> new DoubleTuple(i, i))
    .print();

public static class DoubleTuple extends Tuple2<Integer.Integer> {
    public DoubleTuple(int f0, int f1) {
        this.f0 = f0;
        this.f1 = f1; }}Copy the code

Flink1.12.3: Could not create actor system

Exception in thread "main" java.lang.Exception: Could not create actor system
	at org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:281)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:361)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
	at org.apache.flink.runtime.minicluster.MiniCluster.createLocalRpcService(MiniCluster.java:952)
	at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:288)
	at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:75)
	at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1796)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
	at org.truth.flink.FlinkCep.main(FlinkCep.java:104)
Caused by: java.lang.NoClassDefFoundError: akka/actor/ExtensionId$class
	at org.apache.flink.runtime.akka.RemoteAddressExtension$."init> (RemoteAddressExtension.scala: 32)at org.apache.flink.runtime.akka.RemoteAddressExtension$."clinit> (RemoteAddressExtension.scala)
	at org.apache.flink.runtime.akka.AkkaUtils$.getAddress(AkkaUtils.scala: 804).at org.apache.flink.runtime.akka.AkkaUtils.getAddress(AkkaUtils.scala)
	at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java: 298).at org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:279)
	... 11 more
Caused by: java.lang.ClassNotFoundException: akka.actor.ExtensionId$class
	at java.net.URLClassLoader.findClass(URLClassLoader.java: 382).at java.lang.ClassLoader.loadClass(ClassLoader.java: 418).at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java: 352).at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 17 more

Copy the code

The reason for the appeal exception is that Flink made an error during the 1.12.3 release, which caused Scala 2.12 artifacts to accidentally include Scala 2.11. Scala 2.12 users should not use 1.12.3, but should use 1.12.4 directly to avoid this problem, which will disappear with the update to 1.12.4. Check the original address of the solution

3. The console has no output after the example code is executed after the upgrade to version 1.12+

With Flink version 1.9.3-1.11.3, the sample code would print normally:

8> [LoginEvent(id=2, userId=2, IP =127.0.0.2, type=fail), LoginEvent(id=4, userId=2, IP =127.0.0.1, Type =fail)] 1> [LoginEvent(id=8, userId=4, IP =127.0.0.1, type=fail), LoginEvent(id=10, userId=4, IP =127.0.0.1, type=fail)]Copy the code

But running in Flink version 1.12, the patternStream.print() execution produced no output. Check that the reason is Flink 1.12, stream processing characteristics to the default time TimeCharacteristic. EventTime, that is to say from the default processing time to event time, that’s the problem.

However Flink 1.12 StreamExecutionEnvironment. SetStreamTimeCharacteristic () method has been abandoned, Through setStreamTimeCharacteristic (TimeCharacteristic ProcessingTime) to set the default time stream processing. Looking at the source code for the PatternStream class, you’ll see the following method:


/** Sets the time characteristic to processing time. */
public PatternStream<T> inProcessingTime(a) {
    return new PatternStream<>(builder.inProcessingTime());
}

/** Sets the time characteristic to event time. */
public PatternStream<T> inEventTime(a) {
    return new PatternStream<>(builder.inEventTime());
}
Copy the code

As you can see from the comments, the stream processing time can be set to PatternStream to Processing time using the inProcessingTime() method and to EventTime using the inEventTime() method. Therefore, modify the sample code to the following, and the code can be printed normally.

/ / modify before
patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
    @Override
    public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
        collector.collect(map.get("one").toString());
    }
}).print();


// Updated version
patternStream
        .inProcessingTime()
        .process(new PatternProcessFunction<LoginEvent, String>() {
    @Override
    public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
        collector.collect(map.get("one").toString());
    }
}).print();
Copy the code