In some scenarios, such as the GROUP BY result, it is necessary to update the previous result value. At this point, the key of the Kafka message record needs to be treated as the primary key to determine whether a piece of data should be treated as an insert, delete, or update record. In Flink1.11, this can be done using the ** Changelog-JSON Format ** provided by the Flink-CDC-Connectors project. For details on how to use this function, see CDC Connectors operation practices in Share Flink1.11.

In Flink1.12, there is a new upsert Connector (upsert-Kafka), which extends the existing Kafka connector and works in upsert mode (Flip-149). The new Upsert-Kafka connector can be used as either a source or a sink, and provides the same basic functionality and persistence guarantees as the existing Kafka connector, since much of the code is reused between the two. This article will use Flink1.12 as an example to introduce the basic steps to use this feature, the following is the full text, I hope you can help.

The public account “Big Data Technology and Data Warehouse”, reply to “information” to receive the big data data package

Upsert Kafka Connector Introduction

Upsert Kafka Connector allows users to read data from or write data to a Kafka topic in Upsert fashion.

When used as a data source, the upsert-Kafka Connector produces a Changelog flow in which each data record represents an update or delete event. More precisely, if no corresponding key exists, it is considered an INSERT operation. If a corresponding key already exists, the value corresponding to the key is the last updated value.

Using the table analogy, data records in a Changelog flow are interpreted as UPSERT, also known as INSERT/UPDATE, because any existing rows with the same key are overwritten. In addition, messages with an empty value will be treated as DELETE messages.

When used as a data sink, the upsert-Kafka Connector consumes a Changelog stream. It writes the INSERT/UPDATE_AFTER data as a normal Kafka message value. DELETE data is written as a Kafka message with value empty (the key is marked with a tombstone to indicate that the message corresponding to the key is deleted). Flink partitions the data according to the value of the primary key column to keep the messages on the primary key in order, so update/delete messages on the same primary key will fall in the same partition

Rely on

To use the Upsert Kafka connector, you need to add the following dependencies

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>Flink - connector - kafka_2. 12</artifactId>
    <version>1.12.0</version>
</dependency>

Copy the code

If you use SQL Client, download flink-sqL-connector-kafka_2.11-1.12.0.jar and place it in the lib folder of the Flink installation directory.

use

Use the sample

Create a kafka table where the user stores sink data
CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT.PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka'.'topic' = 'pageviews_per_region'.'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092'.'key.format' = 'avro'.'value.format' = 'avro'
);

Copy the code

Scream tips:

To use the upsert-Kafka connector, you must define the PRIMARY KEY with the PRIMARY KEY when creating the table and specify serialization deserialization formats for the KEY (key.format) and value (value.format).

Upsert – kafka connector parameters

  • connector

Will be selected. Specify the connector to use, Upsert Kafka connector uses: ‘Upsert – Kafka’.

  • topic

Will be selected. The name of the Kafka topic used for reading and writing.

  • properties.bootstrap.servers

Will be selected. A comma-separated list of Kafka Brokers.

  • key.format

Will be selected. A format used to serialize and deserialize the key part of a Kafka message. The key field is specified by the PRIMARY key syntax. Supported formats include ‘CSV’, ‘JSON’, and ‘Avro’.

  • value.format

Will be selected. A format used to serialize and deserialize the value part of a Kafka message. Supported formats include ‘CSV’, ‘JSON’, and ‘Avro’.

  • *properties.支那

Optional. This option can pass any Kafka parameter. The option suffix must match the parameter name defined in the Kafka parameter document. Flink automatically removes the “properties.” prefix from the option name and passes the converted key name and value into KafkaClient. For example, you can use the ‘properties. Allow. Auto. Create. Switchable viewer’ = ‘false’ to automatically create topic is prohibited. However, some options, such as ‘key.deserializer’ and ‘value.deserializer’, do not allow parameters to be passed in this way because Flink overrides the values of these parameters.

  • value.fields-include

Optional. The default value is ALL. Controls whether the key field appears in value. When ALL is selected, the value part of the message will contain ALL the fields in the schema, including those defined as primary keys. When EXCEPT_KEY is set, it indicates that the value part of the record contains all the fields of the schema except those defined as the primary key.

  • key.fields-prefix

Optional. To avoid naming conflicts with the value field, add a custom prefix to the key field. The default prefix is empty. Once the prefix for the key field is specified, the name of the prefix must be specified in the DDL, but the prefix is removed when building the serialized data type for the key. See the example below. Note that to use this attribute, the value of value.fields-include must be EXCEPT_KEY.

Create an upsert table. When a qWE prefix is specified, the involved key must specify the qWE prefix
CREATE TABLE result_total_pvuv_min_prefix (
    qwedo_date     STRING,     -- Count date, must include qWE prefix
    qwedo_min      STRING,      -- Statistics minutes must include the qWE prefix
    pv          BIGINT.- click on the quantity
    uv          BIGINT.-- Multiple visits by the same visitor within a day count only one UV
    currenttime TIMESTAMP.-- Current time
    PRIMARY KEY (qwedo_date, qwedo_min) NOT ENFORCED -- must include the qwe prefix
) WITH (
  'connector' = 'upsert-kafka'.'topic' = 'result_total_pvuv_min_prefix'.'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092'.'key.json.ignore-parse-errors' = 'true'.'value.json.fail-on-missing-field' = 'false'.'key.format' = 'json'.'value.format' = 'json'.'key.fields-prefix'='qwe'.-- Specify the prefix qwe
  'value.fields-include' = 'EXCEPT_KEY' -- Key does not appear in kafka message value
);
-- Writes data to the table
INSERT INTO result_total_pvuv_min_prefix
SELECT
  do_date,    -- Time partition
  cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- Minute time
  pv,
  uv,
  CURRENT_TIMESTAMP AS currenttime -- Current time
from
  view_total_pvuv_min;
Copy the code

Scream tips:

If a key field prefix is specified but the prefix string is not added to the DDL, the following exception will be thrown when a number is written to the table:

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: All fields in ‘key.fields’ must be prefixed with ‘qwe’ when option ‘key.fields-prefix’ is set but field ‘do_date’ is not prefixed.

  • sink.parallelism

Optional. Define the parallelism of upsert- Kafka sink operator. By default, the framework determines the degree of parallelism, consistent with the degree of parallelism of the upstream link operator.

Other Matters needing attention

Serialization format for Key and Value

For serialization of keys and values, see the Kafka Connector. It is worth noting that the serialization format for Key and Value must be specified, where Key is specified through the PRIMARY Key.

Primary Key constraint

Upsert Kafka works in Upsert mode (FLIP-149). When we create a table, we need to define the primary key in the DDL. Data with the same key will be stored in the same partition. Defining a primary key on a Changlog Source means that the primary key is unique on the materialized Changelog. The primary key defined determines which fields appear in the key of a Kafka message.

Consistency assurance

By default, when checkpoint is enabled, Upsert Kafka Sink ensures that data is inserted into Kafka Topic at least once.

This means that Flink can write duplicate records with the same key to a Kafka topic. But because this connector works in upSERt mode, when it is read as a source, it ensures that only the last message with the same primary key value will take effect. Therefore, the upsert-Kafka connector can implement idempotent writes like HBase Sink.

Zonal water level

Flink supports sending appropriate watermark based on the data characteristics of each Upsert Kafka partition. When using this feature, watermark is generated within the Kafka Consumer. The watermark generated by each partition is merged in the same way as the Streaming shuffle (Max input for a single partition and minimum input for multiple partitions). The watermark generated by the data source depends on the current smallest watermark of all partitions for which the consumer is responsible. If part of the partition that the consumer is responsible for is free, the overall watermark does not move forward. In this case, the problem can be mitigated by setting an appropriate table.exec.source.idle-timeout.

The data type

Upsert Kafka stores keys and values of messages in bytes, so there is no schema or data type. Messages are serialized and deserialized in formats such as CSV, JSON, avro. Different serialization formats provide different data types, so depending on the serialization format used, you need to determine whether the data type of the table field is compatible with the data type provided by that serialization type.

Use case

In this paper, the real-time statistics of the total PV and UV web page as an example, introduces the basic use of upsert-Kafka:

  • Kafka data source

The user’s IPPV information, a user can have many PV’s in a day

CREATE TABLE source_ods_fact_user_ippv (
    user_id      STRING,       - user ID
    client_ip    STRING,       - Client IP address
    client_info  STRING,       -- Device model information
    pagecode     STRING,       -- Page code
    access_time  TIMESTAMP.Request time
    dt           STRING,       -- Time partition days
    WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND  - define the watermark
) WITH (
   'connector' = 'kafka'.Use the Kafka connector
    'topic' = 'user_ippv'.Theme -- kafka
    'scan.startup.mode' = 'earliest-offset'.- the offset
    'properties.group.id' = 'group1'.-- Consumer Group
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092'.'format' = 'json'.-- The data source format is JSON
    'json.fail-on-missing-field' = 'false'.'json.ignore-parse-errors' = 'true'
);
Copy the code
  • Kafka Sink table

PV and UV per minute are counted and the results are stored in Kafka

CREATE TABLE result_total_pvuv_min (
    do_date     STRING,     -- Statistical date
    do_min      STRING,      -- Statistics minutes
    pv          BIGINT.- click on the quantity
    uv          BIGINT.-- Multiple visits by the same visitor within a day count only one UV
    currenttime TIMESTAMP.-- Current time
    PRIMARY KEY (do_date, do_min) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka'.'topic' = 'result_total_pvuv_min'.'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092'.'key.json.ignore-parse-errors' = 'true'.'value.json.fail-on-missing-field' = 'false'.'key.format' = 'json'.'value.format' = 'json'.'value.fields-include' = 'EXCEPT_KEY' -- Key does not appear in kafka message value
);
Copy the code
  • Calculation logic
Create view
CREATE VIEW view_total_pvuv_min AS
SELECT
     dt AS do_date,                    -- Time partition
     count (client_ip) AS pv,          -- CLIENT IP address
     count (DISTINCT client_ip) AS uv, -- Client deduplication
     max(access_time) AS access_time   -- Request time
FROM
    source_ods_fact_user_ippv
GROUP BY dt;

-- Write data
INSERT INTO result_total_pvuv_min
SELECT
  do_date,    -- Time partition
  cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- Minute time
  pv,
  uv,
  CURRENT_TIMESTAMP AS currenttime -- Current time
from
  view_total_pvuv_min;
Copy the code
  • The production user accesses data into kafka and inserts data into user_ippV in Kafka:
{"user_id":"1"."client_ip":"192.168.12.1"."client_info":"phone"."pagecode":"1001"."access_time":"The 2021-01-08 11:32:24"."dt":"2021-01-08"}

{"user_id":"1"."client_ip":"192.168.12.1"."client_info":"phone"."pagecode":"1201"."access_time":"The 2021-01-08 11:32:55"."dt":"2021-01-08"}

{"user_id":"2"."client_ip":"192.165.12.1"."client_info":"pc"."pagecode":"1031"."access_time":"The 2021-01-08 11:32:59"."dt":"2021-01-08"}

{"user_id":"1"."client_ip":"192.168.12.1"."client_info":"phone"."pagecode":"1101"."access_time":"The 2021-01-08 11:33:24"."dt":"2021-01-08"}

{"user_id":"3"."client_ip":"192.168.10.3"."client_info":"pc"."pagecode":"1001"."access_time":"The 2021-01-08 11:33:30"."dt":"2021-01-08"}

{"user_id":"1"."client_ip":"192.168.12.1"."client_info":"phone"."pagecode":"1001"."access_time":"The 2021-01-08 11:34:24"."dt":"2021-01-08"}
Copy the code
  • Query result table:
select * from result_total_pvuv_min;
Copy the code

It can be seen that pv and UV only show one data per minute, which represents pv and UV up to the current time point

Query the result_total_pvuv_min theme in Kafka as follows:

It can be seen that: for each access data, a calculation of PV and UV is triggered. Each data is the cumulative PV and UV up to the current time.

Scream tips:

By default, if a query is executed with checkpoint enabled, the Upsert Kafka sink extracts data with at least one guarantee into a Kafka topic.

This means that Flink may write duplicate records with the same key to a Kafka theme. However, because the connector works in UPSERt mode, the last record on the same key takes effect when read back as a source. Thus, the upsert-Kafka connector implements idempotent writes like an HBase receiver.

conclusion

Upsert connector(upsert kafka)** allows users to read and write tables in Kafka using upsert. In addition, this article also gives a specific use case, can further deepen the use of this feature.

The public account “Big Data Technology and Data Warehouse”, reply to “information” to receive the big data data package