The Apache Flink community is proud to announce the release of Version 1.11.0 of Flink! More than 200 contributors contributed to the development of Flink 1.11.0, submitting more than 1,300 fixes or optimizations. These changes greatly improve the usability of Flink and enhance the functionality of the individual API stacks. Some of the more significant changes include:

  1. The core engine part introduces an unaligned Checkpoint mechanism. This mechanism is an important improvement over the Flink fault-tolerant mechanism, which can improve the Checkpoint speed of severe backpressure operations.

  2. Implemented a new set of Source interfaces. The new Source interface can greatly reduce the development complexity of implementing new sources by unifying the flow and batch Source operation mechanism and providing common internal implementations such as event time processing, watermark generation, and idle concurrent detection.

  3. Flink SQL introduces support for CDC (Change Data Capture), which makes it easy for Flink to translate and consume Change logs from databases through tools like Debezium. The Table API and SQL also extend the file system connector’s support for more user scenarios and formats to support scenarios such as writing streaming data from Kafka to Hive.

  4. PyFlink optimizes performance in several parts, including support for vectorized user-defined functions (Python UDFs). These changes enable the Flink Python interface to interoperate with commonly used Python libraries such as Pandas and NumPy, making Flink more suitable for data processing and machine learning scenarios.

The binary distribution and source code for Flink 1.11.0 are available on the Download page of the Flink website, and the corresponding PyFlink distribution is available on the PyPI website. For details, see release notes, release feature updates, and updated documentation.

We hope you can share your feedback with us via the Flink mailing list and the JIRA website after you download the trial version.

GitHub Download address

Flink.apache.org/downloads.h…

New features and optimizations

Changing the CPU CPU: Changing the CPU CPU

When Flink initiates a Checkpoint, the Checkpoint Barrier flows from the Source of the entire topology to the Sink. For operators with more than one input, barriers from each input need to be aligned before the operator can take a snapshot of state and publish the Barrier to subsequent operators. Alignment can normally be done in milliseconds, but alignment can be a bottleneck when backpressing:

  1. Checkpoint Barrier propagates very slowly in a backpressure input channel (waiting for the previous data processing to complete), which blocks data processing on other input channels and eventually further backpressure upstream operators.

  2. Slow Checkpoint Barrier propagation also causes the Checkpoint to take too long or even times out. In the worst case, the Checkpoint Barrier cannot be updated.

In order to improve Checkpoint performance under backpressure, the Flink community initially implemented an unaligned Checkpoint mechanism (flip-76) in version 1.11.0. In contrast to aligned Checkpoint (Figure 1), this operator does not need to wait for the barriers from the various input channels to align. Instead, This allows the Barrier to bypass the previous data to be processed (that is, the data in the output and input buffers) and trigger the Checkpoint synchronization phase directly. This process is shown in Figure 2.

Since the data being propagated must be persisted as part of the snapshot, the unaligned Checkpoint mechanism increases the Checkpoint size. However, the good news is that it greatly reduces the Checkpoint time, so users can see more progress even in unstable environments. This is because unaligned Checkpoint can reduce Recovery’s load. For more information on unaligned Checkpoint and future development plans, please refer to the documentation and Flink-14551, respectively.

As with other Beta features, we look forward to sharing your experiences with the community after you try them out.

Note: * * * * open this feature need through Chekpoint enableUnalignedCheckpoints option configuration parameters. Note that unaligned Checkpoint is valid only if CheckpointMode is set to checkpointmode.exactly_once.

Unified Watermark generator

Currently, Flink’s Watermark generation (also known as allocation) relies on two interfaces: AssignerWithPunctuatedWatermarks and AssignerWithPeriodicWatermarks, these two interfaces and record timestamp relationship extraction is more chaotic, so that the Flink difficult to achieve some users are in urgent need of function, such as support free detection; In addition, it can lead to repetitive code that is difficult to maintain. With flip-126, the existing Watermark generation interface was unified into a single interface, the WatermarkGenerator, which was separate from TimestampAssigner.

This change gives users more control over the sending logic of the watermark and simplifies the implementation of a Source that supports watermark generation and timestamp extraction (see the new Source interface). Based on the interface, Flink 1.11 also provides many built-in Watermark generation strategy (for example, forBoundedOutOfOrderness forMonotonousTimestamps), and users can use their own implementation.

■ Support Watermark idle detection

WatermarkStrategy. WithIdleness () method allows the user to the configuration of time (i.e., the timeout time) there is no record arrived to a flow mark for free, so as to further support the Flink correctly handle multiple concurrent event time between the tilt of the problem, And it avoids idle concurrency that delays the whole system’s event time. By migrating the Kafka connector to the new interface (Flink-17669), users can benefit from idle detection for a single concurrency.

Note: This FLIP change does not currently affect existing applications, but we recommend that users use the new Watermark generator interface in the future to avoid the effects of disabling the previous Watermark generator in subsequent releases.

New Source Interface (Beta)

1.11 Writing a production-ready Flink Source connector is not an easy task, it requires some knowledge of Flink’s internal implementation and the ability to implement event time extraction, Watermark generation, and idle detection in the connector itself. Flink 1.11 addresses this problem by introducing a new Source interface, flip-27, that addresses the problem of having to write two Source implementations for both batch and stream jobs.

The new Source interface allows arbitrary combinations of different partition discovery strategies and specific implementations of partition consumption by splitting the data for each partition into different components (namely, SplitEnumerator and SourceReader).

For example, the existing Kafka connector provides a variety of partitioning discovery strategies, the implementation of which is coupled to the implementation of the actual code. If you migrate to a new interface, Kafka Source can use the same partition consumption implementation (SourceReader) and write separate SplitEnumerator implementations for different partition discovery strategies.

■ Unified flow batch

The Source connector using the new Source interface can be used for both finite data (batch) and unlimited data (stream) jobs. There is only one minor difference between the two scenarios: in the case of finite data, the partition discovery strategy returns a fixed size partition with a finite amount of data for each partition; In the case of unlimited data, either the amount of data per partition is unlimited, or the partition discovery strategy can constantly generate new partitions.

■ Built-in Watermark and transaction time handling

In the new Source interface, TimestampAssigner and WatermarkGenerator will transparently be part of the SourceReader. So users don’t need to implement any timestamp extraction and Watermark generation code.

Note: The existing Source connector has not yet been re-implemented based on the new Source interface, which will be gradually completed in subsequent releases. If you want to implement your own Source based on the new Source interface, consult the Data Source documentation and some suggestions for Source development.

Application Deployment Mode

Prior to 1.11, Flink jobs were deployed in two modes, Session mode in which jobs were submitted to a long-running Flink Session cluster, and Job mode in which a dedicated Flink Job cluster was started for each Job. In both modes, the main method of the user’s job is executed by the client, but there are some problems with this approach. Generating JobGraph can easily become a bottleneck if the client is part of a larger program. Second, this approach also does not fit well in container environments like Docker and K8s.

Flink 1.11 introduced a new deployment mode, the Application mode (FLIP-85). In this mode, the main method of the user program will run in the cluster rather than on the client. In this way, the job submission will become is very simple: users will program logic and rely on executable jar packed into one bag, the entry of the cluster program (ApplicationClusterEntryPoint) is responsible for calling one of the main method to generate JobGraph.

Flink 1.11 already supports k8S-BASED Application mode (Flink-10934).

Other function modifications

■ Unified JM memory configuration (FLIP-116)

In 1.10, Flink unified the memory management and configuration on the TM side, and in 1.11, Flink further modified the memory configuration on the JM side to make its options and configuration consistent with the CONFIGURATION on the TM side introduced in flip-49. This change affects all deployment types, including standalone, Yarn, Mesos and the newly introduced K8s.

Note: Reusing the previous Flink configuration will result in different JVM parameters, which can affect performance or even cause exceptions. Be sure to refer to the migration documentation if you want to update to 1.11.

■* * Enhanced Web UI functions

In 1.11, the community made a number of optimizations to the Flink Web UI. The first change was to optimize TM and JM’s log presentation (FLIP-103), followed by the introduction of a tool to print a list of all threads (Flink-14816) in the Flink Web UI. In future releases, the Web UI will be further optimized, including better backpressure detection, more flexible and configurable exception display, and display of Task error history.

■* * Unified Docker image

1.11 consolidated all docker-related resources into Apache/Flink-Docker project, and extended the entry script to allow users to use the default Docker image in different modes, avoiding the trouble of creating their own image in many cases. Please refer to the detailed documentation on how to use and customize Flink official Docker images in different environments and deployment modes.

Table API/SQL: support for CDC (Change Data Capture)

CDC is a common schema in databases that captures changes committed by the database and broadcasts those changes to other downstream consumers. CDC can be used in scenarios like synchronizing multiple data stores and avoiding problems caused by double writes. Flink users have long wanted to be able to import CDC data into their jobs via the Table API/SQL, and Flink 1.11 enables this.

To be able to use CDC in the Table API/SQL, Flink 1.11 updated the Table Source and Sink interface to support Changelog mode (see the new Table Source and Sink interface) and support Debezium and Canal format (flip-105). This change enables the dynamic Table Source to no longer support only append-only operations, but to import external change logs (insert events), translate them into corresponding change operations (insert, modify, and delete), and send those operations and their types to subsequent flows.

To consume CDC data, users need to specify “format=debezium-json” or “format= clock-json” when creating tables using SQL DDL:

  CREATE TABLE my_table (
  ...
) WITH (
  'connector'='... ', -- e.g. 'kafka'
  'format'='debezium-json'.'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
  'debezium-json.ignore-parse-errors'='true' -- default: false
);
Copy the code

Flink 1.11 only supports Kafka as a data source for change logs and jSON-encoded change logs. Subsequent Flink will further support Avro (Debezium) and Protobuf (Canal) formats. Flink also plans to support UDF MySQL’s Binlog and Kafka’s Compact Topic as data sources in the future, and to extend support for change logging to batch jobs.

** Note: ** There is a known BUG (flink-18461) that causes the Source using the change log to not be written to the Upsert Sink (e.g. MySQL, HBase, ElasticSearch). This will be fixed in the next release (1.11.1).

Table API/SQL: supports JDBC Catalog and Postgres Catalog

Flink 1.11 supports a common JDBC Catalog interface (FLIP-93) that allows Table API/SQL users to automatically export Table structures from relational databases connected over JDBC. This feature eliminates the need for users to manually copy table structures and do type mapping, and allows Flink to check table structures at compile time rather than at run time.

The first implementation in 1.11 was the Postgres Catalog.

Table API/SQL: File system connector that supports Avro, ORC and Parquet formats

To improve the user experience of end-to-end streaming ETL with Flink, Flink 1.11 introduced a new file system connector in the Table API/SQL. It is implemented based on Flink’s own file system abstraction and StreamingFileSink, ensuring the same capabilities and consistent behavior as DataStream apis.

This also means that users of the Table API/SQL can use file formats StreamingFileSink now supports, such as (Avro) Parquet, as well as new file formats added in this 1.11, such as Avro and ORC.

CREATE TABLE my_table (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem'.'path' = 'file:///path/to/file, 'format'='.', -- supported formats: Avro, ORC, Parquet, CSV, JSON ... ) ;Copy the code

The new all-in-one file system Connector transparently supports both stream and batch jobs, provides Exactly once semantics, and provides full partitioning support, greatly expanding the range of scenarios supported by previous connectors. For example, users can easily implement scenarios where streaming data is written from Kafka to Hive.

For further file system connector optimizations, see Flink-17778.

Table API/SQL: Supports Python UDF

Before 1.11, users of the Table API/SQL could only implement UDFs through Java or Scala. In 1.11, Flink extended the scope of the Python language by supporting Python UDF in SQL DDL syntax (Flip-106) and SQL Client (Flip-114) in addition to PyFlink. Users can also register Python UDFs in the system Catalog through the SQL DDL or Java Catalog API, so that these UDFs can be shared among jobs.

Other Table API/SQL optimizations

■ Hive Connect Compatible with Hive DDL and DML (FLIP-123)

Starting with 1.11, users can write SQL statements using Hive syntax (HiveQL) in the Table API/SQL and SQL Client. To support this feature, Flink introduced a new SQL dialect that allows users to dynamically choose between Flink (default) or Hive (Hive) methods for each statement. For a complete list of supported DDL and DML, refer to the Hive dialect documentation.

■* *Flink SQL syntax extension and optimization

  • Flink 1.11 introduced the concept of primary key constraints that can be used in runtime optimization of Flink SQL DDL (FLIP-87).

  • VIEW objects are already fully supported in SQL DDL and can be used with statements such as CREATE/ALTER/DROP VIEW (FLIP-71).

  • Users can dynamically specify or override options for tables using dynamic Table properties in DQL and DML (FLIP-113).

  • To simplify the configuration of connector parameters and improve exception handling, the Table API/SQL changed the names of some configuration items (FLIP-122). This change does not break compatibility and users can still use the old name.

■ New Table Source and Sink interface (FLIP-95)

Flink 1.11 introduced new TableSource and Sink interfaces (DynamicTableSource and DynamicTableSink) that can unify batch and stream jobs. The Blink Planner provides more efficient data processing and supports processing of modified logs (see Support for modified logs). The new interface simplifies the complexity of implementing new custom connectors and modifying existing connectors. Refer to this document for an example of implementing Source for custom table scans based on a data parsing format that supports modified log semantics.

Note: Although this change does not break compatibility, we recommend that users of the Table API/SQL upgrade their existing Source and Sink to the new interface as soon as possible.

■* * Reconfigure Table Env interface (FLIP-84)

Before 1.11, similar interfaces on TableEnvironment and Table did not behave exactly the same, which led to inconsistent interfaces and user confusion. To solve this problem and make writing programs based on the Table API/SQL smoother, Flink 1.11 introduces new ways to unify these inconsistent behaviors, such as execution firing timing (executeSql()), result presentation (print(), Collecto ()) and lays the foundation for important features such as multi-statement execution in later releases.

Note: Methods marked as out of date in Flip-84 will not be removed immediately, but we recommend new methods. For a complete list of new and outdated methods, check out the Summary section of the Flip-84.

■* * New type inference and Table API UDF (FLIP-65)

In Flink 1.9, the community began supporting a new type system in the Table API to improve consistency with standard SQL (Flip-37). This is nearing completion in 1.11 by supporting the use of a new type system in the Table API UDF (scalar and Table functions are currently supported, and support for Aggregate functions is planned for the next release).

PyFlink: Supports Pandas UDF

Prior to 1.11, the Python UDF in PyFlink supported only the standard Python scalar types. This brings some limitations:

  1. Passing data between the JVM and Python processes leads to significant serialization and deserialization overhead.

  2. It is difficult to integrate commonly used high-performance Python numerical computation frameworks, such as Pandas and NumPy.

To overcome these limitations, the community introduced support for the (scalar) vector Python UDF based on Pandas (FLIP-97). Because you can minimize serialization/deserialization overhead by leveraging Apache Arrow, vector UDFs generally perform very well; In addition, pandas and NumPy libraries can be fully reused by using pandas.Series as input and output types. These features make Pandas UDF particularly well suited for parallel machine learning and other large-scale, distributed data science computing operations, such as feature extraction or distributed pattern services.

@udf(input_types=[DataTypes.BIGINT(),DataTypes.BIGINT()],result_type=DataTypes.BIGINT(),udf_type="pandas")
defadd(i,j):
  returni+j
Copy the code

To change the UDF into Pandas UDF, you need to add an additional parameter udF_type = “Pandas” to the DECORator of the UDF, as shown in the documentation.

Other optimizations for PyFlink

■* * Support converter fromPandas/toPandas (FLIP-120)

Arrow is also used to optimize the conversion between PyFlink Table and Pandas.DataFrame, allowing users to switch seamlessly between different processing engines without having to write special connectors for the transfer. For examples of how to use the fromPandas() and toPandas() methods, refer to the documentation.

■* * Support user-defined Table Function (USER-DEFINED Table Function,UDTF) (Flink-14500)

Starting with 1.11, users can define and register custom UDTFs in PyFlink. Like Python UDFs, UDTF can accept zero, one, or more scalar values as arguments, but can return arbitrary rows of data as output rather than a single value.

■* * UDF performance optimization based on Cython (FLIP-121)

Cython is a precompiled superset of The Python language that is often used to improve the performance of large-scale data computation functions because it optimizes code execution speed down to the machine instruction level and works well with common C-based libraries such as NumPy. Starting with Flink 1.11, users can construct PyFlink[60] that includes Cython support and can optimize Python UDFs through Cython. This optimization can significantly improve code performance (up to a 30-fold improvement over Python UDF 1.10).

■* *Python UDF supports user defined Metrics (FLIP-112)

To make it easier for users to monitor and debug the execution of the Python UDF, PyFlink now supports collecting and outputting Metric values to external systems, as well as custom fields and variables. Users can access a Metric system in the UDF’s open method by calling function_context.get_metric_group(), as shown in the documentation.

Other important optimizations

  • [Flink-17339] As of 1.11, Blink Planner will become the default Table API/SQL Planner. In fact, SQL Client’s default Planner has become Blink Planner in 1.10. The old Planner will still be supported, but no major changes will follow.

  • [Flink-5763] Savepoints writes all state (including metadata and program state) to a single directory. This makes it easy for the user to see which files are contained in each Savepoint State and allows the user to relocate Savepoint directly by moving the directory.

  • [Flink-16408] To reduce the JVM metadata space, FLINK 1.11 will reuse the ClassLoader of a single TaskExecutor as long as there is a Slot for that job on it. This change changes the behavior of Flink error recovery because static fields are not reinitialized.

  • [F**** link-11086] Flink now supports Hadoop 3.0.0 and above. Note that the Flink project doesn’t provide any updated “Flink-shaded – Hadoop-*” JARS, Instead, you need to add your Hadoop dependencies to the HADOOP_CLASSPATH environment variable (as recommended) or to the lib/ directory.

  • [Flink-16963] All FLINK built-in Metric reports are now modified as FLINK plug-ins. If they are to be used, they should not be placed in the lib/ directory (which causes class conflicts), but in the plugins/ directory.

  • The [FLINK-12639] community is refactoring FLINK documents, and starting with 1.11, you may notice some changes in document navigation and content organization.

Detailed Release Notes

If you want to upgrade to 1.11, please read the release details. Compared to all previous 1.x versions, 1.11 guarantees compatibility for all interfaces labeled @public.