1 introduction

In the timeTimeIn that one, three concepts of time were introducedEvent,IngestinProcess, which also briefly introduced disorderly orderEvent TimeEvent and its solutionWatermarkThe water

Too (see article, decided to call it the water, because the window is the trigger condition Watermark > Window_end_time, a bit like water reached the water overflow, of course, also can call it the Watermark, depends on personal hobby ~ ~)

Watermark: Time to Watermark: Time to Watermark: Time to Watermark: Time to Watermark: Time to Watermark: Time to Watermark

2 Watermark theory

2.1 Concept of Watermark

Watermark is a mechanism for measuring the progress of Event Time, which is a hidden attribute of the data itself. Timestamp = 1575090298299(2019-11-30 13:04:58) If the delay-able time is set as 3S, the water level extracted from the event may be as follows:

water(1575090298299) = 1575090298299 - 3000(2019-11-30 13:04:55)Copy the code

If this water line is adopted, it means that all events with TIMESTAMP less than 1575090298299-3000 (2019-11-30 13:04:55) have reached the Flink program.

IdealIn the ideal case, events arrive in order to coincide with the processing time, but in the real case, there are various other factors that cause events to be delayedRealityThat line, whereSkewIt’s that they tilt, representing the possible delay in arrival.

2.2 Functions of Watermark

The Event might be delayed, so the Event might be out of order. Watermark was used to solve this problem.

Watermark is essentially a time stamp that increments monotonously, and as soon as a steady stream of events arrive in the program, the watermark can be updated and replaced with a larger value.

According to the watermark recognition program, any location, progress, and data that is smaller than the watermark have arrived, and subsequent events should be received with a time larger than it.

By default, subsequent events smaller than the water mark are considered late data, and Flink’s default policy is to discard them and not calculate them (but there are other mechanisms for collecting discarded late data, see SideOutputLateData for more details).

In some cases, we want to delay the window for a few seconds before triggering the calculation. For example, there is a time window [00:01]-[00:04] at present, but maybe at 05s, there is an event of 03. We want to wait for the delayed event to appear, and add the event of 03 to the correct window for calculation

Watermark is a message queue (the number on the event represents its timestamp)

The size of the Time window set in the figure above is 4s, and the Time attribute is Event Time Event. The data in the message queue arrives in the program out of order, and the dividing lines W (4) and W (9) represent watermarks (combined with the analysis in the figure below, the maximum probability of delay Time is set to 3S).

In the example above, before data 7 enters the second window and the watermark is not generated, data 3 enters the first window and then the front watermark is generated. The global watermark is updated to W (4). As W (4) >= WINDOW1_END_time, the calculation of the first window is triggered.

Subsequent data 5 and 6 continue to enter the second window, and data 9 and 12 enter the third window.

Then data 12 enters the program and is assigned to the third window. The watermark W (9) calculated is greater than the end time of the second window, which will trigger the calculation of the second window, and so on.

It can be seen from the above that data 3 arrived after data 7, which was late data. However, the setting of watermark allowed late events for a certain time, so the setting of watermark could solve out-of-order events to a certain extent.

2.3 Operator Processing to Watermark

Quote from Louisvv’s blog post

Watermark can be handled by an operator, which has a time logger inside that records the end time of each Window.

When the operator receives a Watermark, it will update the internal Event Time Clock according to the timestamp of the Watermark, and compare the current recorded Time with the Watermark. If the Watermark is larger than the recorded Time, The record is updated to the latest Watermark value.

2.4 Setting of Watermark

To track events out of order, we needed to set up a Watermark that allowed delay to take place for a certain amount of time (but not indefinitely, timeliness and memory usage were important).

A watermark is generated immediately after receiving the DataSource data, or a simple map or filter operation is performed after the source to generate a watermark.

There are two ways to set Watermark:

In code, you can callDataStreamTwo of theAPITo extract time and allocate watermark, respectivelyAssignerWithPunctuatedWatermarksAssignerWithPeriodicWatermarks.

  • Punctuated Watermark: each incremental in the data streamEventTimeWill produce oneWatermark

In the actual production environment, a large number of Watermark will be generated in the case of high TPS, which may cause certain pressure on downstream operators to a certain extent. Therefore, this method will be used to generate watermarks only in scenes with high real-time performance.

  • PeriodicWatermarks: Periodically (at a certain time interval or reaches a certain number of records) generates watermarks

In real production environments, the PeriodicWatermarks are used more often and are periodic (via setAutoWatermarkInterval(…)). , set the interval of milliseconds) to generate watermark, but must combine the time or accumulated number of two dimensions, otherwise there will be a great delay in extreme cases.

Take a look at their position and structure in the code:

Let’s take a look at the commonly used PeriodicWatermarks with code examples

3 PeriodicWatermarks Demo

Let’s start with the logic of the program:

1. Send data to port 9010: Nc-l 9010 2, write a program, listen to port 9010, read according to the line, map word segmentation into tuple2 type (key, time) 3, extract time and generate watermark 4, event time window is 4s, allow delay time is 3s

Among them, the custom of Watermark generated rules implemented AssignerWithPeriodicWatermarks, realize getCurrentWatermark: generated Watermark and extractTimestamp: extraction time stamp

3.1 Main logic of the program

int port = 9010; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Use eventtime / / set, the default is to use the processtime env. SetStreamTimeCharacteristic (TimeCharacteristic. Eventtime); env.setParallelism(1); DataStream<String> text = env.socketTextStream("127.0.0.1", port, "\n"); DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { String[] arr = value.split(",");
        returnnew Tuple2<>(arr[0], Long.parseLong(arr[1])); }}); DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new WordCountPeriodicWatermarks()); DataStream<String> window = watermarkstream.keyby (0) // Allocate window by message EventTime, And call TimeWindow effect. The window (TumblingEventTimeWindows. Of (Time. Seconds (4))). The apply (new WindowFunction < Tuple2 < String, Long >, String, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception { String key = tuple.toString(); List<Long> arrarList = new ArrayList<>(); List<String> eventTimeList = new ArrayList<>(); Iterator<Tuple2<String, Long>> it = input.iterator();while(it.hasNext()) { Tuple2<String, Long> next = it.next(); arrarList.add(next.f1); EventTimeList. Add (String. The valueOf (next. F1). The substring (8, 10)); } Collections.sort(arrarList); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                String result = "\n key value:" + key + "\n " +
                        "Number of data in trigger window:" + arrarList.size() + "\n " +
                        "Trigger window start data:" + sdf.format(arrarList.get(0)) + "\n " +
                        "Trigger window last (possibly delayed) data:" + sdf.format(arrarList.get(arrarList.size() - 1))
                        + "\n " +
                        "Event data in window:" + Joiner.on(",").join(eventTimeList) + "\n" +
                        "Actual window start and end times:" + sdf.format(window.getStart()) + "----" + sdf.format(window.getEnd()) + " \n \n "; out.collect(result); }}); window.print(); env.execute("eventtime-watermark");Copy the code

The code posted above is used to receive data written by port 9010, then map extraction, then watermark, and finally enter the window operator for window calculation.

3.2 Watermark Generator

public class WordCountPeriodicWatermarks implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> { private Long currentMaxTimestamp = 0L; // The maximum allowed out-of-order time is 3 s private Final Long maxOutOfOrderness = 3000L; private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    @Override
    public Watermark getCurrentWatermark() {
        returnnew Watermark(currentMaxTimestamp - maxOutOfOrderness); } @Override public long extractTimestamp(Tuple2<String, Long> element, Timestamp long timestamp = element.f1; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); long id = Thread.currentThread().getId(); System.out.println("Thread ID:"+ id +
                " 键值 :" + element.f0 +
                ", event event :["+sdf.format(element.f1)+
                " ],currentMaxTimestamp:[ "+
                sdf.format(currentMaxTimestamp)+"], watermark time :["+
                sdf.format(getCurrentWatermark().getTimestamp())+"]");
        returntimestamp; }}Copy the code

The extractTimestamp() method extracts Event Time from the property of the data itself. This method returns math.max (timestamp, currentMaxTimestamp), compares the current timestamp with the Event timestamp, and returns the larger.

getCurrentWatermarkIs to obtain the current watermark, the maximum delay time defined here is 3s, the generated watermark will be subtracted, for example, after data 7 comes in, the watermark calculation rules are as follows:w(7 - 3) = w(4). If the event that exceeds the watermark time after the subsequent window calculation is triggered, it will be discarded by default.

3.3 Run the program and test data

Remember to open port 9010 in the terminal before running the program: NC-L 9010

Test data: 2, 3, 1, 7, 3, 5, 9, 6, 12, 17, 10, 16, 19, 11, 18 The first column is key, which is 001 for easy identification. The second column is time stamp, which is set to the corresponding number of seconds according to the above test data

001,1575129602000 001,1575129603000 001,1575129601000 001,1575129607000 001,1575129603000 001,1575129605000 001,1575129609000 001,1575129606000 001,1575129612000 001,1575129617000 001,1575129610000 001,1575129616000 001157129190 00 001157129110 00 001157129180, 00Copy the code

3.4 Ideal verification results

In the first test, I copied the whole batch of data to the terminal, and the program processed it:

The output is as follows:

Event Time CurrentMaxTimeStamp Watermark Window_Start_Time Window_End_Time
00:00:02 00:00:02 23:59:59 00:00:00 00:00:04
00:00:03 00:00:03 00:00:00
00:00:01 00:00:01 00:00:00
00:00:07 00:00:07 00:00:04 00:00:04 00:00:08
00:00:03 00:00:03 00:00:04 00:00:00 00:00:04
00:00:05 00:00:05 00:00:04 00:00:04 00:00:08
00:00:09 00:00:09 00:00:06 00:00:08 00:00:12
00:00:06 00:00:06 00:00:06 00:00:04 00:00:08
00:00:12 00:00:12 00:00:09 00:00:12 00:00:16
00:00:17 00:00:17 00:00:14 00:00:16 00:00:20
00:00:10 00:00:17 00:00:14
00:00:16 00:00:17 00:00:14
00:00:19 00:00:19 00:00:16
00:00:11 00:00:19 00:00:16
00:00:18 00:00:19 00:00:16

Let’s look at the example in the first window, where number 3 comes in after number 7, but it still goes into the first window correctly, which means it’s setWatermarkCan resolve out-of-order events.

3.5 Leave a question

The same test data, the previous is a batch copy of the past, the program and watermarkWatermarkThis works normally, but in the case of a single piece of data being sent one by one, another window firing condition occurs:

Also focus on the first window, in the previous example, the output result is that there are four window data, this time there are only three, our next data 3 has been abandoned, I can not explain this situation clearly, it is also left a hole, we will deal with later.

3.6 Combination of Watermark and Window

As you can see from the above example, maxOutOfOrderness needs to be set to a delay time when using watermarks. If you set the delay time too high, multiple Windows may appear in your application, and the Windows may not be triggered.

If the server uses too much memory, OOM will not be mentioned and the calculation window will not be triggered. As a result, the real-time performance of statistics will deteriorate and service output will be affected.

In accordance with thezhishengIn the future use, we should pay attention to the following two points:

  • Reasonable setupmaxOutOfOrderness, avoid too large
  • Don’t rely too much onEvent TimeDo not set the time policy toEventTime

4 How to deal with delayed data

4.1 Default Policy: Discard

This is the default processing mode. As can be seen from the previous example, data 10 enters after data 17. At this time, the watermark is larger than 10, so the window [08 <–> 12] in which data 10 is located has been triggered, so data 10 is abandoned and does not participate in calculation

4.2 The rest

  • AllowedLateness again specifies the time to allow data to be late
  • SideOutputLateData collects late data

The first two really in-depth study, it is probably also a college question, so I recommend you to see others write, after I learn to share with you:

  • Flink flow computing programming — A detailed introduction and consideration of allowedLateness in Flink

5 concludes

  • How does Flink handle out-of-order

Use the Watermark + Window mechanism

  • When Flink triggers Window

Normal setting, no setting allowedLateness more delay processing time

Watermark >= Event Time

For allowedLateness, see Resources 4

  • Flink’s suggestions for using Watermark

1. Set maxOutOfOrderness properly to avoid being too large 2. Do not set the Time policy to EventTime for scenarios that do not rely heavily on EventTime

Flink designed the Watermark to handle delayed events because of out-of-order events. In this paper, we introduce two methods of generating watermarks: Periodic and Punctuated: incremental generation. Taking Periodic watermark as an example, the code and test data verify that the delay event is processed correctly.

In the further study of Watermark, I referred to many articles and had a general understanding of its concept and use, but there were still some questions and pits, I hope you can share with me if you have some understanding. Please feel free to discuss with me if you have any other suggestions or mistakes in this article

6 Project Address

Github.com/Vip-Augus/f…

git clone https://github.com/Vip-Augus/flink-learning-noteCopy the code

7 Reference Materials

  1. Flink Window analysis and Watermark analysis of out-of-order data mechanism -Flink test
  2. Flink Stream computing programming — Introduction to Watermark
  3. Flink WaterMark profile
  4. Flink flow computing programming — A detailed introduction and consideration of allowedLateness in Flink

Welcome to our Java ecosystem for you to prepare the knowledge system/interview must see materials reply to collect