Google published a very important paper in 2013 on how Google is exactly once Semantic in stream processing, called MillWheel. This isn’t the first implementation to be exactly once (maybe Trident is a little bit earlier), but it’s definitely innovative to use the concepts of low Watermark and Per key storage. The original here: https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
In order to do exactly once Semantic, we need to introduce a few concepts
- Persistent storage: When storing the finished results of the event stream, there must be a system that stores the finished results permanently, because many of the finished results need to be called again
- Low Watermark: Streams of events are sent to Google’s servers from around the world, so there is a delay in the flow of events between data centers. So MillWheel requires you to provide a time frame within which all data should reach your event stream handler
- Duplicate Prevention: Each row of incoming Duplicate data is deleted
Each row of MillWheel data can be understood as three values: key, value and TIMESTAMP. In this case, the lower watermark is calculated based on each time stamp sent
The data processing as a whole can determine how the contents of these event streams are processed based on the DAG (no loop diagram) provided by the user, so that the user can overlay a variety of calculations. For example, WHEN I click on an advertisement on an unknown q&A website, the click can be aggregated according to the advertisement ID, advertiser ID and user ID, but the processing logic is separate, but all can be executed under the framework of MillWheel.
The MillWheel framework ensures that data in each row is checkpoint based on each key, and data in each row is provided only once. Let’s take a look at how MillWheel achieves this guarantee and how we can take advantage of it.
Since we are going to do checkpointing according to every key, every line of data must have a logic that can read the key from the data. There are many internal Deserialization protocols, such as Protobuf, that are used to read keys. The key depends on what kind of business logic you’re dealing with, so if you’re dealing with an advertising system the ID of the advertiser or the ID of the user clicking on the AD is a reasonable key.
After providing a key, MillWheel also provides a Per Key Persistent storage for you to better process your business logic. Such as I need to give advertisers each AD how many clicks every five minutes, but I don’t want to every user users put so many things, that each user may click I just save hyperloglog, as long as see him lately many clicks to judge whether or not he robot, the click is valid.
Of course, not every click on an AD is sent to the system: when we get the key, we also get a time stamp for that row of data, which is used to calculate the low watermark. Low watermark is defined as a timestamp of data that has arrived at MillWheel now but has not yet been processed, but that is not more than a user-defined timestamp that is online and not much later than the current time. Therefore, as long as the data exceeds the user-defined time range, it is late data, and the state of late data will not be stored in the memory. The cool thing about this design is that if the data processing is always fast and all the messages are not late, the low watermark will be very close to the actual time. If the data is late, it will never exceed the user-set limit.
Because there was a concept of low watermark, we had to make sure there was a service that counted low watermark. The MillWheel design is that each event-stream handler will return its own time stamp for the oldest unprocessed data, and the Injector will then collect the latest time stamp from each handler. Collect the latest timestamp because each processor’s watermark should be the same and should be the most conservative.
Every time a row of data is processed, as soon as it passes the low watermark, MillWheel needs to do the following:
- Check that this data is not duplicated
- Process user-supplied logic
- All the states are stored in the database
- Tells the server that sent the data that it has finished processing
- The server sends the following data to the processor
Here, the sender will give each row a unique ID, and the receiver will dedup according to the unique ID. Sometimes, to optimize speed, you can take many rows of data from the server and process them at once.
There are a couple of more complicated situations that we need to consider. For example, the output data also needs to be checkpoint, otherwise it is possible to output two completely different data in the same time interval, because the state at the end of the aggregation has not been saved. With checkpoint output, the entire data processing becomes a service directly to idempotent. Of course, some data processing is idempotent, so the dedUP can be omitted, or broadcast to the downstream output and checkpoint.
Another point to note here is that each key must have only one writer, and the state of each key must be atomic when stored; otherwise, there is no way to ensure that the state of each key is consistent.
At the end of the paper, we mainly discuss the effect after deploy and some edge cases. I will pick some interesting ones here
I’ll just say a few last words about why this is an important question. In an Internet advertising agency, various events are paid based on data streams. Google is going to charge other companies for clicks on ads, YouTube is going to charge other companies for video views. In these cases, receiving money is very important: if you receive less money, the company suffers; If you get paid too much, your data will be different from the company’s data reviewed by a third party, and there will be a very serious goodwill problem. So this data flow processing can’t be too much or too little in this respect, it’s better to only process once per row and only charge once.
At this point, you might want to step back and say, in order to deal with charging, can I just do dedup daily or hourly, and then reply to the user and say how many clicks you get on your AD and how much will I charge you? Another problem here is that in many cases the revenue is reported to advertisers in real time, because advertisers want to have the ability to see how effective their ads are at any time, and then can choose to increase or decrease their budgets. Even in certain situations (such as the Superbowl or black Friday), where advertisers are essentially “this is how much money I’m going to spend today, and I don’t care which platform it goes out on”, real-time reporting becomes especially important.
Ok, one more time the original here: https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf. There are a lot of papers about system recently, but I still haven’t finished my reading list. I will write some ML related papers to you after reading the book list of the system.