Flink data stream programming model
1. Level of abstraction
Flink provides different levels of abstraction to develop stream or batch jobs.
- The lowest level of abstraction only provides stateful flows, which will be embedded into the DataStream API through Process functions. It allows the user to freely process events from one or more data streams with a consistent fault-tolerant state, in addition to registering event events and handling time callbacks, allowing the program to handle complex calculations.
- In fact, most applications do not require the underlying abstractions described above and instead program against Core APIs such as DataStream (bounded or unbounded streaming data) and DataSet API (bounded data sets). These apis provide common building blocks for data processing, like user-defined transformations, joins, aggregations, window operations, states, and so on. The data types handled by these apis are represented by their respective programming languages in the form of classes.
The underlying Process functions are integrated with the DataStream API to enable low-level abstractions for specific operations. The DataSet API provides additional primitives, entry loops and iterations for bounded datasets.
- The Table API is a table-centric declarative DSL where tables can change dynamically (when expressing streaming data). The Table API follows the (extended) relational model: tables have two-dimensional data structures (similar to tables in relational databases) and the API provides comparable operations, such as SELECT, Project, Join, group-by, aggregate, and so on. The Table API program declaratively defines what logical operations should be performed rather than determining exactly how the code for those operations looks. Although the Table API can be extended with many types of user-defined functions (UDFs), it is not as expressive as the core API, but it is much cleaner to use (with less code). In addition, the Table API program is optimized by the built-in optimizer before execution.
You can seamlessly switch between tables and DataStream/DataSet to allow applications to mix the Table API with DataStream and DataSet.
- The highest level of abstraction that Flink provides is SQL. This layer of abstraction is similar to the Table API in syntax and expressiveness, but represents the program in the form of SQL query expressions. SQL abstraction interacts closely with the Table API, and SQL queries can be executed directly on tables defined by the Table API.
2. Programs and data flows
The basic building blocks of Flink are streams and transformations. (Note that the DataSets used by Flink’s DataSet API are also streams internally — more on that later.) Conceptually, a stream is a (potentially endless) stream of data records, while a transformation is an operation that takes one or more streams as inputs and produces one or more output streams as a result.
When executed, the Flink program maps to streaming dataflows, consisting of streams and conversion operators. Each data flow starts at one or more sources and ends at one or more sinks. The data flow is analogous to any directed acyclic graph (DAG). Although certain forms of rings are allowed through iterative construction, for the most part, we leave this out for simplicity.
Typically, there is a one-to-one relationship between transformations in the program and operations in the data flow. Sometimes, however, a transformation may consist of multiple transformation operations.
3. Parallel data flow
Flink programs are essentially distributed in parallel. During execution, a flow contains one or more flow partitions, and each operator contains one or more operator subtasks. Operation subtasks are independent of each other, executed on different threads, and may even run on different machines or containers.
The number of sub-tasks of the operator is the parallelism of this particular operator. The parallelism of a flow is the parallelism of its production operator. Different operators in the same program may have different levels of parallelism.
Streams transmit data between two operators, which can be in one-to-one (forwarding mode) or redistributing mode:
- A one-to-one flow (for example, between the Source and map() operators in the figure above) preserves partitioning and ordering of elements. That means that the map() operator’s subtask [1] will see the same elements in the same order that the Source subtask [1] generated them.
- Redistributing flows (such as those between Map () and keyBy/ Window, and between keyBy/window and Sink in the figure above) change the flow partitioning. Each operator subtask sends data to a different target subtask based on the selected transformation. Like keyBy() (repartitioning based on the hash of key), broadcast(), or rebalance() (randomly repartitioning). In a Redistributing exchange, ordering between elements is reserved for each pair of send and receive subtasks (for example, the map() subtask [1] and the keyBy/window subtask [2]). So in this example, the order of each key is preserved, but parallelism does introduce uncertainty about the order in which the aggregation results of different keys reach sink.
4. Window
Aggregate events (such as counting, summing) work differently on streams than batch processing. For example, it is impossible to count all the elements in a flow, because normally the flow is infinite (unbounded). Instead, aggregations on streams need to be scoped by Windows, such as “calculate the last 5 minutes,” or “sum of last 100 elements.”
Windows can be event-driven (e.g., every 30 seconds) or data-driven (e.g., every 100 elements). Windows are usually divided into different types, such as scroll Windows (no overlap), sliding Windows (overlap), and session Windows (interrupted by inactive gaps).
More examples of Windows can be found on this blog.
5. The time
When it comes to time in a stream (such as a definition window), you can refer to different concepts of time:
- The event time is the time when the event was created. It is typically described by a timestamp in an event, such as attaching to a production sensor, or production service. Flink accesses event timestamps through the timestamp dispatcher.
- The ingestion time is the time when the event enters the Flink data stream source operator.
- The processing time is the local time of each operator that performs a time operation.
More details on how to handle time can be found in the event time documentation.
6. Stateful operation
While many operations in the data stream view only one independent event at a time (such as event parsers), some record information between multiple events (such as window operators). These operations are said to be stateful.
The state of stateful operations is held in a section that can be thought of as an embedded key/value store. State is rigorously partitioned and distributed along with streams read by stateful operators. Therefore, only the key/value state of Keyed Streams after a keyBy() function can be accessed, and only the values associated with the current event key. The key that aligns the flow and state ensures that all state updates are local operations to ensure consistency without transaction overhead. This alignment also allows Flink to transparently reassign partitions of state and adjustment flows.
7. Fault tolerant checkpoints
Flink uses a combination of stream replay and Checkpoint to achieve fault tolerance. Checkpoint is associated with a specific point in the state of each input stream and each of its associated operators. A stream data stream can be recovered from a checkpoint by restoring operator state and replaying events from the checkpoint for consistency (once processing semantics)
Checkpoint intervals are a means of eliminating the overhead of fault tolerance during execution by recovery time (the number of events that need to be replayed).
More details on checkpoint and fault tolerance can be found in the fault tolerance documentation.
Batch processing on streams
Flink executes a batch program as a special case of a stream handler, except that the stream is bounded (finite elements). The inside of a DataSet is treated as a data stream. The same concepts that apply to stream handlers as above apply to batch handlers, with some exceptions:
- Programs in the DataSet API do not use checkpoints. And it recovers by completely rerouting. This works because the input is bounded. This approach increases the cost of recovery, but reduces the overhead of normal processing by avoiding checkpoints.
- Stateful operations in the DataSet API use simplified IM-memory /out-of-core data structures instead of key/value indexes.
- The DataSet API introduces special synchronous (superstep-based) iterations that can only be performed on bounded streams. See the iteration documentation for details.