Flink version 1.11 will be officially announced soon! To satisfy your curiosity and expectations, we invited the core Flink developers to explain and share the features of version 1.11. Flink 1.11 improves on many aspects of 1.10 and aims to further improve the availability and performance of Flink. Finishing | Gao Yun, Cheng Hequn Review | zhi-jiang wang
Flink version 1.11 will be officially announced soon! To satisfy your curiosity and expectations, we invited the core Flink developers to explain and share the features of version 1.11. Flink 1.11 improves on many aspects of 1.10 and aims to further improve the availability and performance of Flink.
This article details the new features, improvements, important changes and future plans for version 1.11. Check out the corresponding FLIP or Jira pages for more information, and stay tuned for our upcoming live feature.
Cluster deployment and resource management
In cluster deployment
1.[Flip-85] Flink supports Application Mode
At present, Flink creates JobGraph and submits jobs through a separate client. In actual use, problems will occur such as downloading job JAR packages occupies a large amount of bandwidth on the client machine, and the need to start a separate process (occupying unmanaged resources) as the client. To address these issues, a new Application pattern is provided in Flink-1.11, which moves JobGraph generation and job submission to the Master node.
The application mode can be used by using bin/flink run-application. Currently, the Application mode supports Yarn and K8s deployment modes. The Yarn Application mode transfers all the dependencies required for running tasks to The Flink Master through the Yarn Local Resource. The task is then submitted on the Master side. The K8s Application allows the user to build a mirror that contains the user’s JARS and dependencies, while the TaskManager is automatically created based on the job, and the entire Cluster is destroyed when finished.
2. [Flink-13938] [Flink-17632] Flink Yarn supports remote Flink Lib Jar caching and creating jobs using remote Jar
Before 1.11, Flink needed to upload Jars in Flink Lib every time it submitted a job on Yarn, consuming extra storage space and communication bandwidth. Flink-1.11 allows users to provide multiple remote lib directories. Files in these directories will be cached on Yarn nodes to avoid unnecessary Jar package uploads and downloads, making the commit and start up faster:
./bin/flink run -m yarn-cluster -d \
-yD yarn.provided.lib.dirs=hdfs://myhdfs/flink/lib,hdfs://myhdfs/flink/plugins \
examples/streaming/WindowJoin.jar
Copy the code
In addition, 1.11 allows users to create jobs directly using jars on remote file systems, further reducing the overhead of Jar downloads:
./bin/flink run-application -p 10 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://myhdfs/flink/lib" \
hdfs://myhdfs/jars/WindowJoin.jar
Copy the code
3. [Flink-14460] Flink K8s features enhanced
In 1.11, Flink supports the Application mode proposed by Flip-85 for K8s, which has better isolation compared to Session mode.
In addition, Flink has added features to support K8s features, such as Node Selector, Label, Annotation, and Toleration. For easy integration with Hadoop, Hadoop configurations are automatically mounted based on environment variables.
4. [Flip-111] Docker image unification
Flink project previously provided a number of different dockerfiles to create a Docker image of Flink, now they are unified in apache/ Flink-Docker [1] project.
5. [Flink-15911][Flink-15154] supports the configuration of the binding network interface for local listening and the IP address and port for external access
In some usage scenarios (such as Docker and NAT port mapping), the local network address and port seen by the JM/TM process may be different from the address and port used by other processes to access the process from the outside. Previously, Flink did not allow users to set different local and remote addresses for TM/JM, which made Flink problematic in NAT networks used by Docker and others, and could not limit the exposure range of listening ports.
1.11 introduced different parameters for local and remote listening addresses and ports. Among them:
- jobmanager.rpc.address
- jobmanager.rpc.port
- taskmanager.host
- taskmanager.rpc.port
- taskmanager.data.port
To configure remote listening addresses and ports,
- jobmanager.bind-host
- jobmanager.rpc.bind-port
- taskmanager.bind-host
- taskmanager.rpc.bind-port
- taskmanager.data.bind-port
Used to configure the local listening address and port.
In terms of resource management
1. [Flink-16614] Unified JM memory resource configuration
One big change in Flink-1.10 is the redefinition of the TM memory model and configuration rules [2]. Flink 1.11 further adjusts the JM memory model and configuration rules to unify the JM memory configuration mode with TM:
For specific memory configuration methods, refer to the corresponding user documentation [3].
2. [Flip-108] Added scheduling support for extended resources (such as Gpus)
With the development of machine learning and deep learning, more and more Flink operations will embed machine learning or deep learning models, resulting in the demand for GPU resources. Prior to 1.11, Flink did not support managing extended resources such as gpus. To address this, in 1.11, Flink provides a unified management framework for extended resources with built-in support for GPU resources.
About extension resources management framework and GPU resources management further configuration, you can refer to the corresponding FLIP pages: cwiki.apache.org/confluence/… Interface section (the corresponding user documentation community is in preparation, please refer to the corresponding user documentation in the future).
3. [flink-16605] Allows the user to limit the maximum number of slots in the Batch job
To prevent Flink Batch jobs from taking up too many resources, flink-1.11 introduced a new configuration item: slotManager.number-of-slots.max, which can limit the maximum number of slots for the entire Flink cluster. This parameter is only recommended for Batch Table/SQL jobs using Blink Planner.
Flink-1.11 WEB UI enhancements
1. [FLIP-103] Improved JM/TM log presentation on the Web UI
Previously, users could only read.log and.out logs through the Web UI, but other files, such as GC logs, might actually exist in the log directory. The new interface allows users to access all logs in the log directory. In addition, log reload, download and full screen display capabilities have been added.
2. [flip-99] allows you to display more historical Failover exceptions
In the past, only 20 historical Failover exceptions can be displayed on the Web UI for a single job. In the case of frequent Failover, the initial exception (root cause is more likely) will soon be submerged, making troubleshooting more difficult. The new WEB UI supports paging to show more historical exceptions.
3. [flink-14816] allows users to Dump threads directly on the page
Thread Dump is very helpful for locating problems in some jobs. Before 1.11, users had to log in to the TM machine to perform Thread Dump operations. This feature was integrated into the 1.11 WEB UI, which added the Thread Dump TAB to allow users to obtain TM Thread dumps directly from the WEB UI.
Source & Sink
1. [Flip-27] New Source API
Flip-27 is a major Feature in 1.11. Flink’s traditional Source interface has some problems. Problems such as the need to implement different sources for stream and batch jobs, the lack of a unified data partition discovery logic, the need for Source implementors to handle their own locking logic, and the lack of a common architecture that requires Source developers to manually handle multiple threads. These problems increase the difficulty of implementing Source in Flink.
The FLIP-27 introduced a whole new set of Source interfaces. This interface provides unified data partition discovery and management functions, users only need to focus on partition information reading and data reading logic, and do not need to deal with complex thread synchronization problems, which greatly simplifies the burden of Source implementation, but also provides a foundation for providing more built-in functions for Source in the future.
2. [Flink-11395][Flink-10114] Streaming File Sink added support for Avro and ORC format
For common StreamingFileSink, 1.11 adds support for two common file formats, Avro and ORC.
Avro:
stream.addSink(StreamingFileSink.forBulkFormat(
Path.fromLocalFile(folder),
AvroWriters.forSpecificRecord(Address.class)).build());
Copy the code
ORC:
OrcBulkWriterFactory<Record> factory = new OrcBulkWriterFactory<>(
new RecordVectorizer(schema), writerProps, new Configuration());
Stream.addSink(StreamingFileSink
.forBulkFormat(new Path(outDir.toURI()), factory)
.build());
Copy the code
The State administration
1. [Flink-5763] Modify the file structure of Savepoint so that Savepoint can be self-contained and moved
Flink-1.11 replaces the absolute file path in Savepoint with a relative path, allowing the user to move Savepoint directly without having to manually change the path in meta. This function is not supported when Entropy Injection is enabled in S3 file systems.
2. [flink-8871] Add the callback of Checkpoint failure and notify TM
Prior to Flink 1.11, we provided notification of Checkpoint success. In 1.11, the mechanism of notifying TM terminals of Checkpoint failure is added. On the one hand, you can cancel the Checkpoint in progress. On the other hand, you can receive notification through the notifyCheckpointAborted interface added by CheckpointListener.
3. [Flink-12692] Heap keyed Statebackend can overflow data to disks
(This feature is not actually incorporated into the Flink 1.11 code, but users can download it fromFlink-packages.org/packages/sp…
Heap Statebackend can achieve better performance because it maintains state directly in the form of Java objects. However, the amount of memory used by its Heap State Backend is uncontrollable and can cause serious GC problems.
In order to solve this problem, SpillableKeyedStateBackend support the overflow of data to a disk, allowing Statebackend restrict the use of memory size. More information about SpillableKeyedStateBackend, can consult flink-packages.org/packages/sp…
4. [flink-15507] Local Recovery is enabled for Rocksdb Statebackend by default
After Local Recovery is enabled by default, the Failover speed is accelerated.
5. Change the default value of state.backend.fs.memory-threshold to 20K
(This is a work in progress, but should be included in 1.11)
State.backend.fs. memory-threshold determines when state data needs to be written out of memory in FS Statebackend. The previous default of 1K caused problems with large numbers of small files in many cases and affected State access performance, so it was increased to 20K in 1.11. It is important to note that this change may increase JM memory usage, especially if operator concurrency is large or UnionState is used. [4]
Table & SQL
1. [Flip-65] Optimized the type inference mechanism in Table API UDF
Compared with the previous type inference mechanism, the new type inference mechanism can provide more type information about input parameters, allowing users to implement more flexible processing logic. Currently this feature provides UDF and UTF support, but UDAF is not yet supported.
2. [FLIP-84] Optimized the interface of TableEnvironment
Flink-1.11 enhances TableEnv in the following ways:
- Previously sqlUpdatec acted differently with DDL and DML: the former was executed immediately, while the latter waited until env.execute. 1.11 Unified env.executeSql
- Provides support for queries that need to return results, such as show table, Explain SQL, and so on.
- Provides support for caching multiple SQL statement executions.
- The collect method is added to allow users to obtain query execution results
3. [Flip-93] supports JDBC and Postgres-based Catalog
Before 1.11 When users use Flink to read/write to relational databases or read Change Log, they need to manually copy the table schema of the database to Flink. This process is tedious and error-prone, which greatly increases the user’s use cost. 1.11 provides JDBC – and Postgres-based Catalog management that enables Flink to automatically read table schemas, thereby reducing manual user operations.
4. [Flip-105] Added support for ChangeLog source
Import dynamic Data from external systems (such as Mysql BinLog, Kafka Compacted Topic) into Flink via Change Data Capture mechanism (CDC). And writing Flink’s Update/Retract stream to an external system is something users have been waiting for. Flink-1.11 implements support for reading and writing CDC data. Flink currently supports both Debezium and Canal CDC formats.
5. [Flip-95] New TableSource and TableSink interfaces
Simplifies the current Table Source/Sink interface structure, provides a foundation for supporting CDC functionality, avoids the dependency on DataStream API and solves the problem that only Blink Planner can support an efficient Source/Sink implementation.
For more specific interface changes, see:
Cwiki.apache.org/confluence/…
6. [Flip-122] Modify Connector configuration items
Flip-122 rearranges the “With” configuration item for Table/SQL Connector. Due to historical reasons, the With configuration item has some redundancy or inconsistencies. For example, all configuration items use connector. Start and different configuration item name patterns. The modified configuration items resolve these redundancy and inconsistency problems. (It is important to note that the existing configuration items are still available).
For a list of new configuration items, see:
Cwiki.apache.org/confluence/…
7. [FLIP-113] Flink SQL supports dynamic Table attributes
The dynamic Table property allows users to dynamically modify the configuration items of the Table when using the Table, thus avoiding the trouble of re-declaring the DDL of the Table due to the change of the configuration items. As shown below, dynamic attributes allow the user to override attribute values in DDL during query execution with /+ OPTIONS(‘ k1 ‘=’ v1 ‘)/ syntax.
SELECT *
FROM
EMP /*+ OPTIONS('k1'='v1'.'k2'='v2') */
JOIN
DEPT /*+ OPTIONS('a.b.c'='v3'.'d.e.f'='v4') */
ON
EMP.deptno = DEPT.deptno
Copy the code
8. [Flip-115] Added Flink SQL support for Hive
- For FileSystem Connector provides for CSV/orc/parquet/json/avro five format support, as well as to the Batch and Streaming FileSystem full support of the Connector.
- Support for Hive Streaming Sink is provided.
9. [Flip-123] Supports Hive compatible DDL and DML statements
Flip-123 provides support for Hive dialects, which enables users to operate using Hive DDL and DML.
DataStream API
[Flink-15670] Kafka Shuffle: Use the Kafka job message bus to provide a mechanism for simultaneously exchanging and storing data between operators
Flink Kafka Shuffle provides DataStream API to use Kafka as a message bus between link operators and a mechanism to exchange and store data simultaneously. The benefits of this approach are:
- Shuffle data can be reused.
- In failure recovery of the Job, the persisted data will be separated from the persisted data to avoid the complete restart while still maintaining the exactly once meaning.
This mechanism can be used as a supplement to large-scale streaming job Failure recovery before the Failover refactoring of Flink is completed.
2. [Flip-126] Optimized WatermarkAssigner interface of Source
(Note that this is done, but whether to include it in 1.11 is still under discussion)
Before new WatermarkAssigner interface will AssignerWithPunctuatedWatermarks and the interface of two kinds of Watermark AssignerWithPeriodicWatermarks integration, This simplifies the implementation of Source support for Watermark in subsequent development.
3. [Flip-92] supports operators with more than two inputs
Flink 1.11 provides support for multiple input operators. But, at present this feature does not provide a complete DataStream API interface, the user, if want to use, need to manually create MultipleInputTransformation and MultipleConnectedStreams manner:
MultipleInputTransformation<Long> transform = new MultipleInputTransformation<>(
"My Operator",
new SumAllInputOperatorFactory(),
BasicTypeInfo.LONG_TYPE_INFO,
1);
env.addOperator(transform
.addInput(source1.getTransformation())
.addInput(source2.getTransformation())
.addInput(source3.getTransformation()));
new MultipleConnectedStreams(env)
.transform(transform)
.addSink(resultSink);
Copy the code
PyFlink & ML
1. [Flink-15636] Supports the running of Python UDF in batch mode of FLINK Planner
Previously, Python UDFs could run in Blink Planner’s flow, batch, and Flink Planner’s flow modes. With this support, the flow batch mode of both Planners supports running Python UDFS.
2. [Flink-14500] Python UDTF support
The UDTF supports a single write and multiple outputs. Both Of the Planner’s stream batch modes support running Python UDTF.
3. [Flip-121] Optimized Python UDF execution efficiency through Cython
The computation logic for Coder (serialization, deserialization) and Operation was optimized with Cython, and the end-to-end performance was tens of times better than in version 1.10.
4. [FLIP-97] Pandas UDF support
Pandas The UDF uses Pandas.Series as the input and output types to process data in batches. In general, the Pandas UDF performs better than the regular UDF because it reduces the serialization and deserialization overhead of data interactions between Java and Python processes, and reduces the number of Python UDF calls and call overhead because it can process data in batches. In addition, the Pandas UDF allows users to use the Python libraries associated with Pandas more easily and naturally.
5. [FLIP-120] Supports conversion between PyFlink Table and Pandas DataFrame
You can return a Pandas DataFrame object using the to_PANDAS () method on the Table object, or convert a Pandas DataFrame object to a Table object using the from_pandas() method.
import pandas as pd
import numpy as np
# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a"."b"]).filter("a > 0.5")
# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()
Copy the code
6. [FLIP-112] supports user-defined metrics in Python UDF
Four custom Metric types are currently supported, including: Counter, Oc, Meters, and Pressing. It also supports the definition of User Scope and User Variables corresponding to metrics.
7. [flip-106][flip-114] Supports the use of Python UDF in SQL DDL and SQL client
Previously, Python UDFs were only available in the Python Table API. After registering Python UDFs with DDL support, SQL users can also easily use Python UDFs. In addition, the SQL Client also supports Python UDF, Python UDF registration and dependency management.
8. [Flip-96] Supports Python Pipeline APIS
Flink 1.9 introduced a new ML Pipeline API to enhance Flink ML’s ease of use and scalability. Due to the extensive use of the Python language in the ML space, flip-96 provides a corresponding Python Pipeline API for the convenience of Python users.
Runtime optimization
1. [Flip-76] Supports Unaligned Checkpoint
Under the existing Checkpoint mechanism of Flink, each operator needs to wait until all upstream sent barriers are aligned before it can perform Snapshot and continue to send backward barriers. In the case of backpressure, the Barrier may take a long time to pass from the upstream operator to the downstream, resulting in Checkpoint timeout.
To address this issue, Flink 1.11 added Unaligned Checkpoint. After an Unaligned Checkpoint is enabled, a Checkpoint is performed when the first barrier is received and data being transmitted between the upstream and downstream is saved to the snapshot as status. In this way, the Checkpoint completion time is shortened. It does not depend on the processing capability of the operator, which solves the problem that checkpoint cannot be performed for a long time in backpressure scenarios.
Can through the env. GetCheckpointConfig (.) enableUnalignedCheckpoints (); Enable the unaligned Checkpoint mechanism.
2. [Flink-13417] Supports Zookeeper 3.5
Supports Flink and ZooKeeper 3.5 integration. This will allow users to use some of the new Zookeeper features such as SSL.
3. [Flink-16408] Supports slot-level Classloder reuse
Flink 1.11 changes the loading logic of the TM ClassLoader: instead of creating a new ClassLoader after every Failover, the Slot occupied by the job is cached in 1.11. This change has an impact on the semantics of job Failover because Static fields are not reloaded after Failover, but it avoids the problem of running out of META memory in the JVM caused by creating a large number of classloaders.
4. [flink-15672] Upgrade the logging system to log4j 2
Flink 1.11 upgrades the logging system Log4j to 2.x, which resolves some of the problems with Log4j 1.x and uses some of the new features of 2.x.
5. [Flink-10742] Reduces the number of data copies and memory usage at the TM receiver
Flink-1.11 reduces the memory copy from Netty layer to Flink buffer and the additional overhead of direct memory by overusing Flink’s own buffer memory management when the downstream network receives data. This reduces the chance that a Direct Memory OOM or Container is killed due to Memory overuse.
The above is a forward-looking interpretation of the new Flink 1.11 features, and the community will continue to arrange related content and technical sharing oh ~
References:
[1] github.com/apache/flin…
[2] ci.apache.org/projects/fl…
[3] ci.apache.org/projects/fl…
[4] lists.apache.org/thread.html…