Learn Flink CEP sample code
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
List<LoginEvent> loginEvents = new ArrayList<>();
loginEvents.add(new LoginEvent(1.1."127.0.0.1"."success"));
loginEvents.add(new LoginEvent(2.2."127.0.0.2"."fail"));
loginEvents.add(new LoginEvent(3.1."127.0.0.2"."success"));
loginEvents.add(new LoginEvent(4.2."127.0.0.1"."fail"));
loginEvents.add(new LoginEvent(5.1."127.0.0.1"."success"));
loginEvents.add(new LoginEvent(6.3."127.0.0.2"."fail"));
loginEvents.add(new LoginEvent(7.3."127.0.0.1"."success"));
loginEvents.add(new LoginEvent(8.4."127.0.0.1"."fail"));
loginEvents.add(new LoginEvent(9.1."127.0.0.2"."success"));
loginEvents.add(new LoginEvent(10.4."127.0.0.1"."fail"));
DataStreamSource<LoginEvent> dataStreamSource = environment.fromCollection(loginEvents);
Pattern<LoginEvent, LoginEvent> pattern = Pattern
.<LoginEvent>begin("one", AfterMatchSkipStrategy.skipPastLastEvent())
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getType());
}
}).timesOrMore(2);
PatternStream<LoginEvent> patternStream = CEP.pattern(
dataStreamSource
.keyBy(LoginEvent::getUserId),
pattern);
// patternStream
// .select((PatternSelectFunction
) map -> {
,>
// return map.toString();
// }).print();
patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
collector.collect(map.get("one").toString());
}
}).print();
environment.execute("TRUTH_TEST_TRUTH1");
Copy the code
1.Lambda expression problem
At the beginning, I used a lot of Lambda expressions in the Demo, but when I finally executed, the following exception occurred:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.cep.PatternSelectFunction could not be inferred. Support for synthetic interfaces.lambdas.and generic or raw types is limited at this point
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java: 1239).at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java: 1263).at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java: 1226).at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java: 789).at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java: 587).at org.apache.flink.cep.PatternStream.select(PatternStream.java: 132).at com.truth.flinkdemo.flink.FlinkCEPKt.main(FlinkCEP.kt: 59)at com.truth.flinkdemo.flink.FlinkCEPKt.main(FlinkCEP.kt)
Copy the code
The input and output parameter types of a function in a lambda expression do not need to be declared because the Java compiler inferences these parameters. Although Flink can automatically extract the result type information from the implementation of method signatures, unfortunately, such functionality is compiled by the Java compiler, which makes it impossible for Flink to automatically infer the type information for the output type. According to the official website specific solutions are as follows:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
// Use Flink's "returns(...) "Method to represent a specific type
env.fromElements(1.2.3)
.map(i -> Tuple2.of(i, i))
.returns(Types.TUPLE(Types.INT, Types.INT))
.print();
// Write the implementation class of the corresponding interface
env.fromElements(1.2.3)
.map(new MyTuple2Mapper())
.print();
public static class MyTuple2Mapper extends MapFunction<Integer.Tuple2<Integer.Integer>> {
@Override
public Tuple2<Integer, Integer> map(Integer i) {
returnTuple2.of(i, i); }}// Use anonymous inner classes (personally preferred, as in the sample code)
env.fromElements(1.2.3)
.map(new MapFunction<Integer, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> map(Integer i) {
return Tuple2.of(i, i);
}
})
.print();
// Use a tuple subclass instead in this example
env.fromElements(1.2.3)
.map(i -> new DoubleTuple(i, i))
.print();
public static class DoubleTuple extends Tuple2<Integer.Integer> {
public DoubleTuple(int f0, int f1) {
this.f0 = f0;
this.f1 = f1; }}Copy the code
Flink1.12.3: Could not create actor system
Exception in thread "main" java.lang.Exception: Could not create actor system
at org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:281)
at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:361)
at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
at org.apache.flink.runtime.minicluster.MiniCluster.createLocalRpcService(MiniCluster.java:952)
at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:288)
at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:75)
at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1796)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at org.truth.flink.FlinkCep.main(FlinkCep.java:104)
Caused by: java.lang.NoClassDefFoundError: akka/actor/ExtensionId$class
at org.apache.flink.runtime.akka.RemoteAddressExtension$."init> (RemoteAddressExtension.scala: 32)at org.apache.flink.runtime.akka.RemoteAddressExtension$."clinit> (RemoteAddressExtension.scala)
at org.apache.flink.runtime.akka.AkkaUtils$.getAddress(AkkaUtils.scala: 804).at org.apache.flink.runtime.akka.AkkaUtils.getAddress(AkkaUtils.scala)
at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java: 298).at org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:279)
... 11 more
Caused by: java.lang.ClassNotFoundException: akka.actor.ExtensionId$class
at java.net.URLClassLoader.findClass(URLClassLoader.java: 382).at java.lang.ClassLoader.loadClass(ClassLoader.java: 418).at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java: 352).at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 17 more
Copy the code
The reason for the appeal exception is that Flink made an error during the 1.12.3 release, which caused Scala 2.12 artifacts to accidentally include Scala 2.11. Scala 2.12 users should not use 1.12.3, but should use 1.12.4 directly to avoid this problem, which will disappear with the update to 1.12.4. Check the original address of the solution
3. The console has no output after the example code is executed after the upgrade to version 1.12+
With Flink version 1.9.3-1.11.3, the sample code would print normally:
8> [LoginEvent(id=2, userId=2, IP =127.0.0.2, type=fail), LoginEvent(id=4, userId=2, IP =127.0.0.1, Type =fail)] 1> [LoginEvent(id=8, userId=4, IP =127.0.0.1, type=fail), LoginEvent(id=10, userId=4, IP =127.0.0.1, type=fail)]Copy the code
But running in Flink version 1.12, the patternStream.print() execution produced no output. Check that the reason is Flink 1.12, stream processing characteristics to the default time TimeCharacteristic. EventTime, that is to say from the default processing time to event time, that’s the problem.
However Flink 1.12 StreamExecutionEnvironment. SetStreamTimeCharacteristic () method has been abandoned, Through setStreamTimeCharacteristic (TimeCharacteristic ProcessingTime) to set the default time stream processing. Looking at the source code for the PatternStream class, you’ll see the following method:
/** Sets the time characteristic to processing time. */
public PatternStream<T> inProcessingTime(a) {
return new PatternStream<>(builder.inProcessingTime());
}
/** Sets the time characteristic to event time. */
public PatternStream<T> inEventTime(a) {
return new PatternStream<>(builder.inEventTime());
}
Copy the code
As you can see from the comments, the stream processing time can be set to PatternStream to Processing time using the inProcessingTime() method and to EventTime using the inEventTime() method. Therefore, modify the sample code to the following, and the code can be printed normally.
/ / modify before
patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
collector.collect(map.get("one").toString());
}
}).print();
// Updated version
patternStream
.inProcessingTime()
.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
collector.collect(map.get("one").toString());
}
}).print();
Copy the code