record

In the last article, we mainly analyzed the workflow of FLink. This article will detail our understanding of Watermark

The reasons causing

In the process of data processing in Flink flow, there is a process from event generation, to stream source and to operator. In most cases, the data flowing to operator is in the order of event generation, but it is not excluded that the data is out of order due to network delay and other reasons. In particular, when kakfa was consumed, message data from multiple partitions was unordered, and the Watermark mechanism was created to ensure that the data was organized and calculated in a timely manner.

What is the

Simply put, Watermark is a special timestamp that Flink uses to process EventTime window calculations. A system Event generated as required by the Source or custom Watermark generator, which flows to the corresponding Operator like a normal data stream Event. The operator that receives the Watermark Event constantly adjusts itself to manage EventTime clock, and Flink ensures that the Watermark increases monotonously. When the operator receives a Watermark, it knows where the count data stream has been processed in the time dimension.

generation

Creating watermarks in Flink above 1.10 no longer requires implementing your own interface. The Flink API provides a WatermarkStrategy that includes both TimestampAssigner and WatermarkGenerator. There are many common watermark strategies available in the WatermarkStrategy tool class, although you can build your own watermark strategy in certain scenarios.

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier < T > {/ * * * according to the strategy to instantiate a distributable timestamp {@ link TimestampAssigner}. */ @Override TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context); /** * Instantiate a Watermark generator based on the policy. */ @Override WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); }Copy the code

As shown above. Typically we don’t need to implement this interface, but can use the Watermark strategy passed in the WatermarkStrategy utility class or use this utility class to bind our custom TimestampAssigner to WatermarkGenerator. For example, if we need to use a bounded out-of-orderness watermark generator and a lambda expression as a timestamp allocator, here’s how:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);
Copy the code

The TimestampAssigner setting is optional and usually does not need to be specified. With Kafka, for example, you can get the timestamp directly from the data source.

Use policies

WatermarkStrategy is used in Flink in two ways, either directly on the data source or directly after non-data source operations.

The first approach is recommended because the data source can use watermark to generate information about sharding/partitions in the logic. The watermarkStrategy must use a specific data source interface, such as linking to Kafka, using Kafka Connerctor, Use the second approach only when it is not possible to set policy is directly on the data source

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props); kafkaSource.assignTimestampsAndWatermarks( WatermarkStrategy. .forBoundedOutOfOrderness(Duration.ofSeconds(20))); DataStream<MyType> stream = env.addSource(kafkaSource); . / / the second final StreamExecutionEnvironment env = StreamExecutionEnvironment getExecutionEnvironment (); DataStream<MyEvent> stream = env.readFile( myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter(), typeInfo); DataStream<MyEvent> withTimestampsAndWatermarks = stream .filter( event -> event.severity() == WARNING ) .assignTimestampsAndWatermarks(<watermark strategy>); withTimestampsAndWatermarks .keyBy( (event) -> event.getGroup() ) .window(TumblingEventTimeWindows.of(Time.seconds(10)))  .reduce( (a, b) -> a.add(b) ) .addSink(...) ;Copy the code

If a partition/shard in the data source has not sent event data for a period of time, then watermarkStrategy will not get any data to generate watermark. In this case, you can set an idle time. When this time is exceeded, the shard or partition is marked as idle.

WatermarkStrategy .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withIdleness(Duration.ofMinutes(1)); // If the time exceeds 1 minute, the state is set to idleCopy the code

Custom WatermarkGenerator

In addition to the existing Watermark policies that come with the Flink API, we can also customize our own Watermark policies through the WatermarkGenerator interface

/** * {@code WatermarkGenerator} can generate watermark based on events or periodically. * * <p><b> note: < / b > WatermarkGenerator will be independent of each other before * {@ code AssignerWithPunctuatedWatermarks} and {@ code AssignerWithPeriodicWatermarks} It's included. */ @public Public Interface WatermarkGenerator<T> {/** * Each time an event data is called, you can check or record the event timestamp, or you can generate watermark based on the event data itself. */ void onEvent(T event, long eventTimestamp, WatermarkOutput output); /** * periodically called, which may or may not generate a new watermark. * * <p> The interval between calling this method to generate watermark is determined by {@link ExecutionConfig#getAutoWatermarkInterval()}. */ void onPeriodicEmit(WatermarkOutput output); }Copy the code