AIOps community is initiated by Cloud Intelligence. Aiming at operation and maintenance business scenarios, AIOps community provides the overall service system of algorithms, computing power and data sets as well as solution exchange community for intelligent operation and maintenance business scenarios. The community is committed to spreading AIOps technology, aiming to solve the technical problems of intelligent operation and maintenance industry together with customers, users, researchers and developers from various industries, promote the implementation of AIOps technology in enterprises, and build a healthy and win-win AIOps developer ecosystem.
Flink SQL dynamic table
Create a Kafka dynamic table
Create a Kafka dynamic table in Flink. Create a kafka table with kafka topic metadata as the last field in the table. Create table create_time (Flink SQL) create table create_time (Flink SQL) create table create_time (Flink SQL) create table create_time (Flink SQL)
- Kafka address: 10.2.3.14:9092
- Topic name: r_01
- Consumer Group ID: 8001
- Example data: Order 1 purchases goods 1, and the consumption amount is 1 yuan
{“order_id”:1,”product_id”:1,”trans_amount”:1}
Create the Clickhouse dynamic table
Create Clickhouse dynamic table in Flink. You can now see the Clickhouse table structure, which contains data types and primary key information for related fields, as well as corresponding fields, data formats, and primary keys in the Flink SQL table builder sentences. WITH contains connection information for Clickhouse and configuration information for data operations
- Clickhouse address: 10.2.3.14:8123
- Database name :default
- Data table: product_sale
- Table structure and sample data are as follows:
Create Redis dynamic table
Create Redis dynamic table in Flink. As the Redis table is designed to be used as a dimension table, it must contain primary keys for data association and common fields for data completion. In the construction of the table, it is shown that one or more primary keys must be set and one or more common fields must be set. Data is stored in Redis in HASH format and can be viewed using HGETALL.
- Redis address: 10.2.3.39:3301
- Key prefix: index:product_sale
- Redis example data is as follows:
Flink SQL connection parameters
Connect the Kafka
-
Connector: Specifies the connector type. The fixed value kafka.
-
Topic: Refers to a topic that needs to consume or write data.
-
Bootstrap. servers: Kafka connection address, which can be multiple and separated by commas.
-
Broker address: Data can be read from one or more nodes when the cluster is running properly. In addition, a node can be filled in when kafka nodes are large and topic partitions are small, and data can be read when topic partitions are not on the node. Note that in the event of a kafka service failure, filling in multiple broker addresses improves resilience if individual nodes are interrupted.
-
Consumption patterns
- Veride-offset corresponds to the de novo consumption in the platform. Each start of the task reads all the existing data in the topic from front to back. However, this order is relative. If a topic has multiple partitions and there is a certain data skew, the partitions with less data will read faster in terms of data time. Kafka data is read by partition;
- Latest-offset corresponds to the start of consumption in the platform. The task starts with the latest data;
- Group-offsets are consumed according to group offset in the corresponding platform. In this mode, a task reads the latest data when it is started for the first time, and when a subsequent task restarts, it picks up where it left off at the end of the last run. This mode is also the default consumption mode for Kafka consumers. In this mode, the group ID must be set. Kafka records the offset of the data processed by the group ID. Because Kafka records offsets, the group ID can be used across platforms and applications. For example, when a Java task needs to do kafka data persistence and is implemented by Flink, the Flink task can use the same group ID as the Java task to achieve smooth task switching, so that no data loss, no data repetition;
-
Group ID: The offset used to record processing data that continues from the breakpoint in the event of a task restart or abnormal recovery.
-
Format: Kafka message format. CSV, JSON, RAW, and CDC formats are commonly used.
-
Ignore-parse-errors: There are many reasons for parsing failures. For example, some data formats are not JSON. In this case, the whole data is discarded. Note that if the format of a field in a JSON data is different from that in the table construction sentence, the strong-forwarding failure only affects the data field and does not affect the parsing of other fields.
-
Fail-on-missing-field: Whether to terminate the Flink task if a kafka message lacks a field defined in the CREATE Table.
Connection Clickhouse
Url: You can fill in a single JDBC URL to write logical tables in a cluster. You can also fill in multiple JDBC urls, separated by commas (,) to indicate that the Clickhouse local table is written in a polling manner.
Table-name: there can be only one Table name. When polling is written into the local Table, THE URL connection and database can be the same or different. For example, different libraries in the same URL must have the same Table name.
Flush-max-rows: In combination with flink-to-clickhouse synchronous output, semi-synchronous output, asynchronous output
Connect the Redis
Only the Hash structure of Redis is supported. The detailed data structure is as follows:
The hash key: {key – prefix} {key – spacer} {k1} {key – spacer} {k2}
Hash field: Any field in the schema other than the key field
Hash Value: stores the values of fields other than keys. Flink Redis Schema supports the following types: STRING, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, and DOUBLE
Flink SQL function
Flink SQL functions are divided into built-in functions and custom functions
Custom functions:
ScalarFunction: Row-level data processing, a row of one or more columns of data processing output a data
TableFunction: also row-level data processing, accepts one or more parameter input, but can output multiple rows and columns of data
AggregateFunction: an AggregateFunction used with group by to calculate and output an index value based on multiple rows and columns of data
TableAggregateFunction: TableAggregateFunction, used with group BY, calculates and outputs multi-row and multi-column data based on multi-row and multi-column data. Table aggregatefunction can not be used in FlinkSQL, applicable to Flink Table API
Operation example:
Here we get the name of the fruit with its ID to demonstrate the use of the custom function. Udfs can be developed using Java code with features of the Java language, such as polymorphism. The class in the example inherits Flink’s ScalarFunction and implements an eval function. The custom function is easier to write. In addition, you can write multiple eval functions based on Java’s polymorphic nature to handle multiple types of data. Create a jar package for a custom function and upload it to the platform’s resource library page. When writing data processing tasks, use create Temporary function to import the custom function.
Flink SQL case
Requirements describe
Calculate the sales volume per minute for each fruit in topic R_01, and output the calculated results to Kafka, ClickHouse, and Redis.
{“order_id”:1, “product_id”:1, “trans_amount”:1}
The calculation result should contain the following fields: {“product_id”:1, “product_name”:” apple “, “create_minute”:’2021-12-02 12:00:00 ‘, “trans_amount”:3}
Specific operation
kafka to kafka
- Create a Kafka source
- Create a Kafka sink
- Write data processing SQL
kafka to clickhouse
- Create a Kafka source
- Create a ClickHouse Sink
- Write data processing SQL
kafka to redis
- Create a Kafka source
- Create a Redis sink
- Write data processing SQL
Checkpoint
Role of state
A Checkpoint is a Checkpoint at which Flink stores the status of a task. The status is a first-class citizen of Flink, allowing the program to remember the intermediate results of a run so that it can restart and recover if the task fails
Flink application exception example
- The program that calculates the total order amount of the day in real time is interrupted abnormally, and it does not need to recalculate from 0 to recover from the status
- Real-time ETL synchronizes Kafka data to external storage for abnormal outage, recovery from state does not need to be consumed from scratch
Storage of state data
- Can be saved in memory, when the status data is too large, memory Oom
- Stored in a persistent file system, such as local or HDFS
- Flink application state size is controlled by state expiration time
- Status data needs to be saved periodically for fault recovery
How do I recover from checkpoints
- Read the status data from the last checkpoint, for example, sum of accumulated sales is 8000 yuan
- Read the offset submitted at the last checkpoint, such as partition 0 and offset 1000
- The above status data indicates that the flink application counted $8,000 in sales at the point (0,1000)
- After the application is restored, the consumption starts from (0,1001) and sum starts from 8000
Consistency of state data within checkpoints
- Internal state data consistency semantics: exactly or at least once
- In the same example, the sum value is counted only once for each Kafka message. If the sum value is counted at least once, the sum value may be too high
- If topic has only one partition, it is exactly consistent, because the parallelism of Flink connection kafka source is the number of partitions, and there is no case of multiple streams arriving at different times when the parallelism is 1
- In the case of Kafka multi-partition, flink defaults to multi-parallelism, which is set to semantics of at least one time. In addition, it is highly likely that multiple streams will not arrive at the same time, resulting in high statistical results.
Checkpoint setup
When creating the Flink flow task, you can choose whether to enable the checkpoint and set the checkpoint period. Check point has two functions. One is that the system recovers from the check point when the task is restarted automatically after an accident, which ensures that the task continues to calculate from the abnormal point and maintains data consistency and accuracy. Another function is that after manually stopping a task, you can choose whether to continue the task from the previous checkpoint when you start it again.
Checkpoint recovery
When a task is restarted after an interruption, you can choose whether to restore status data from the last checkpoint. Currently, the system supports a restart policy with a fixed delay and a failure ratio. If the number of failures in a period exceeds the threshold, the system will not restart.
To learn more
Cloud Intelligence Is an open-source, lightweight, converged, and intelligent Operation and maintenance (O&M) integrated Operation Management Platform (OMP). It provides functions such as Management, deployment, monitoring, inspection, self-healing, backup, and recovery, providing users with convenient O&M capabilities and service Management. In addition to improving the efficiency of o&M personnel, it greatly improves business continuity and security.
Click on the link below to like OMP and send it to Star to learn more
Making address:Github.com/CloudWise-O…
Gitee address:Gitee.com/CloudWise/O…