New features and optimizations

Memory management and configuration optimization

Flink’s current TaskExecutor memory model has some flaws that make it difficult to optimize resource utilization, such as:

  • The configuration models for stream and batch memory footprint are different;
  • RocksDB State Backend in flow processing requires complex configurations based on users.
In order to make memory configuration more clear and intuitive to users, Flink 1.10 made major changes to TaskExecutor’s memory model and configuration logic (Flip-49 [7]). These changes enable Flink to better adapt to all deployment environments (such as Kubernetes, Yarn, and Mesos), giving users more control over their memory overhead.

■ Managed memory expansion

The range of Managed memory has been expanded to include memory used by RocksDB State Backend. Although batch jobs can use both in-heap and out-of-heap memory, flow processing jobs using RocksDB State Backend can only use out-of-heap memory. So to allow users to perform stream and batch jobs without changing the configuration of the cluster, we have specified that managed memory can only be outside the heap from now on.

■ Simplify RocksDB configuration

Previously, configuring out-of-heap state Backend like RocksDB required a lot of manual debugging, such as reducing JVM heap space and setting Flink to use out-of-heap memory. Flink’s out-of-box configuration now supports all of this, and the memory budget for RocksDB State Backend can be adjusted by simply changing the size of managed memory.

Another important optimization is that Flink can now limit the native memory footprint of RocksDB (Flink-7289 [8]) to avoid exceeding the total memory budget — especially important for container deployment environments such as Kubernetes. For details on how to enable and debug this feature, see RocksDB Debug [9].

Note: Flip-49 changes the resource configuration process for the cluster, so upgrading from previous Flink versions may require adjustments to the cluster configuration. For detailed change logs and debugging guidelines, please refer to documentation [10].

Unified job submission logic

Previously, the commit job was the responsibility of the execution environment and was closely related to different deployment goals such as Yarn, Kubernetes, Mesos. This results in multiple configurations for different environments, which increases the cost of administration.

In Flink 1.10, job submission logic was abstracted to a generic Executor interface (Flip-73 [11]). The new addition to ExecutorCLI (Flip-81 [12]) introduced a unified approach to specifying configuration parameters for any execution goal [13]. In addition, with the introduction of JobClient (Flink-74 [14]) in charge of obtaining JobExecutionResult, the logic of obtaining job execution results is decoupled from job submission.





The changes provide users with a unified entry point to Flink, making it easier to programmatically use Flink in downstream frameworks like Apache Beam or Zeppelin Notebooks. For users who need to use Flink in many different environments, the new configuration-based execution process also significantly reduces the amount of redundant code and maintenance overhead.

Native Kubernetes Integration (Beta)

For users who want to try Flink out in a container environment, to deploy and manage a Flink standalone cluster on Kubernetes, you first need to have an understanding of containers, operators and environmental tools such as Kubectl.

In Flink 1.10, we introduced the initial active Kubernetes integration that supports session mode (Flink-9953 [15]). Active refers to the Flink ResourceManager (K8sResMngr) that communicates with Kubernetes in the original place and applies for pod on demand just as Flink does on Yarn and Mesos. Users can use namespace to start Flink in a multi-tenant environment with less resource overhead. You need to configure the RBAC role and a service account with sufficient permissions in advance.





As mentioned in the unified Job submission logic section, Flink 1.10 maps command line parameters to a unified configuration. Therefore, users can refer to Kubernetes configuration options and submit Flink jobs to Kubernetes from the command line using the following command.

./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jarCopy the code
If you want to try this feature out for the first time, please consult the documentation [16], try it out, and share your feedback with the community:

Table API/SQL: Hive integration available for production

Flink 1.9 comes with a preview version of Hive integration. This release allows users to persist Flink-specific metadata to Hive Metastore using SQL DDL, call UDFs defined in Hive, and read and write tables in Hive. Flink 1.10 further develops and refines this feature, bringing production-available Hive integration that is fully compatible with major Hive releases [17].

■ Batch SQL native partitioning support

Previously, Flink only supported writing to unpartitioned Hive tables. In Flink 1.10, the Flink SQL extension supported INSERT OVERWRITE and PARTITION syntax (Flip-63 [18]), allowing users to write to static and dynamic partitions in Hive.

  • Write static partition
INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)]  select_statement1 FROM from_statement;Copy the code
  • Write dynamic partition
INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;Copy the code
Full support for partitioned tables allows users to benefit from partition pruning when reading data, reducing the amount of data that needs to be scanned, and thus greatly improving the performance of these operations.

■ Other optimizations

In addition to partition pruning, Hive integration for Flink 1.10 introduces a number of optimizations for data reading [19], such as:

  • Projected push-down: Flink uses projected push-down technology to minimize data transfer between Flink and Hive tables by ignoring unnecessary fields when scanning tables. This optimization is especially effective when the table has a large number of columns.
  • LIMIT push-down: For queries that contain LIMIT statements, Flink limits the number of pieces of data returned wherever possible to reduce the amount of data transmitted over the network.
  • ORC vectorization when reading data: To improve performance when reading ORC files, Flink now uses the native ORC vectorization reader by default for Hive 2.0.0 and above and columns with non-composite data types.
■ Use pluggable modules as Flink built-in objects (Beta)

Flink 1.10 introduced a general pluggable module mechanism in the core of Flink Table, which is currently mainly applied to system built-in functions (Flip-68 [20]). Through modules, users can extend Flink’s system objects, for example by using Hive built-in functions like Flink’s system functions. The new release includes a pre-implemented HiveModule that supports multiple Hive versions, although users can choose to write their own pluggable modules [21].

Other Table API/SQL optimizations

■ Watermark and computed columns in SQL DDL

Flink 1.10 added a syntax extension for defining time attributes for stream processing and generating watermark in SQL DDL (Flip-66 [22]). This allows users to perform time-based operations (such as Windows) on tables created with DDL statements and define watermark policies [23].

CREATE TABLE table_name (

WATERMARK FOR columnName AS <watermark_strategy_expression>

) WITH (
...
)Copy the code
■ Other SQL DDL extensions

Flink now strictly distinguishes between temporary/persistent, system/directory functions (Flip-57 [24]). This not only disambiguates function references, but also provides a certain order in which functions are resolved (for example, Flink uses system and temporary functions in preference to directory and persistent functions when there are naming conflicts).

On the basis of Flip-57, we extended the SYNTAX of SQL DDL to support the creation of directory functions, temporary functions, and temporary system functions (Flip-79 [25]) :

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION

[IF NOT EXISTS] [catalog_name.][db_name.]function_name

AS identifier [LANGUAGE JAVA|SCALA]Copy the code
For full Flink SQL DDL support, please refer to the latest documentation [26].

Note: In order to properly handle and ensure consistent behavior on meta-objects (tables, views, functions) in the future, Flink scrapped some object declaration methods in the Table API to make the remaining methods more similar to the standard SQL DDL (Flip-64 [27]).
■ Batch complete TPC-DS coverage

Tpc-ds is a widely used industry-standard decision support benchmark used to measure sqL-based data processing engine performance. Flink 1.10 supports all TPC-DS queries end-to-end (Flink-11491 [28]), indicating that the Flink SQL engine has become capable of meeting modern data warehousing and other similar processing requirements.

PyFlink: Support for native user custom functions (UDFs)

As the first step towards full Python support for Flink, we released a preview version of PyFlink in previous releases. In the new release, we focused on getting users to register in the Table API/SQL and use custom functions (UDF, another UDTF/UDAF planning) (Flip-58 [29]).







If you are interested in the underlying implementation of this feature (Apache Beam-based portable framework [30]), please refer to the Architecture chapter of the FLIP 58 and FLIP 78 [31]. These data structures form the foundation for supporting Pandas and the future introduction of PyFlink into the DataStream API.

Starting with Flink 1.10, users can easily install PyFlink through PIP by executing the following command:

pip install apache-flinkCopy the code
For more optimizations in PyFlink planning, please refer to Flink-14500 [32], and welcome to join the discussion on user requirements [33].

Important changes

  • FLINK-10725[34] : FLINK can now compile and run using Java 11.
  • Flink-15495 [35] : The SQL client now uses Blink Planner by default, providing users with the latest features and optimizations. The Table API is also scheduled to switch from the old Planner to Blink Planner in the next release, and we recommend that users try and become familiar with Blink Planner now.
  • Flink-13025 [36] : New Elasticsearch Sink Connector [37] fully supports Elasticsearch 7.x.
  • Flink-15115 [38] : Connectors for Kafka 0.8 and 0.9 have been marked deprecated and are no longer actively supported. If you are still using these versions or have other questions about them, please contact us on the @dev mailing list.
  • FLINK – 14516 [39] : the network flow control based on credit have been removed, at the same time also remove configuration items “taskmanager.net work. Credit. The model”. In the future, Flink will always use credit-based network flow control.
  • Flink-12122 [40] : In FLINK 1.5.0, Flip-6 [41] changed the distribution of slot between TaskManagers. To use the previous scheduling policy to spread the load as far as possible among all currently available TaskManagers, you can set “cluster.even-spread-out slots: true” in flink-conf.yaml.
  • FLINK – 11956 [42] :
  • Instead of using class relocation loading, s3-Hadoop and S3-Presto file systems use plug-in loading while seamlessly integrating all authentication providers. We strongly recommend that other file systems use only plug-in loads as well, and will continue to remove relocation loads.
  • Flink 1.9 introduced a new Web UI, while retaining the original Web UI for future use. So far, we have received no feedback about problems with the new UI, so the community voted [43] to remove the old Web UI in Flink 1.10.

Release notes

Users who are upgrading to Flink 1.10 should refer to the detailed list of changes and new features in the Release notes [44]. For apis labeled @public, this version is compatible with the previous 1.x API.





Read more: https://yq.aliyun.com/articles/744733?utm_content=g_1000104492

On the cloud to see yunqi: more cloud information, on the cloud case, best practices, product introduction, visit: https://yqh.aliyun.com/