Author: Zhu Eun, Years, not weeks
Keywords: Kafka ETL, high elasticity, free operation and maintenance, low cost
Introduction: Ali Cloud message queue Kafka version provides fully managed services compatible with Apache Kafka ecology, which completely solves the long-term pain points of open source products and is one of the indispensable products in the big data ecology. As Kafka grew in popularity, initially as a simple message bus and later as a data integration system, Kafka’s reliable delivery capabilities made it a reliable source of data for streaming processing systems. In the field of big data engineering, Kafka plays an important role in the upstream and downstream, series data flow pipeline, Kafka application of streaming framework to process messages has gradually become a trend.
Message flow processing framework selection
When it comes to Streaming computing, Storm, Spark Streaming and Flink are commonly used. Currently, all of these frameworks have perfect support for Streaming computing and have corresponding use cases. However, the threshold for using these frameworks is relatively high. Then you migrate your business to these frameworks, and finally use, operate and maintain these flow computing frameworks online, which can be complicated for simple flow processing applications.
In interfacing with traditional streaming systems, all the data infrastructure flows from one system into Kafka and then into another, prompting the Kafka community to wonder: Instead of passing data from one system to the next for processing, why not implement a streaming framework of its own? With this in mind, starting with version 0.10.0, Kafka not only provides a reliable source of data for every popular streaming framework, but also provides a powerful streaming library, Kafka Streams, as part of the client class. In this way, the developer can read, process, and generate events within the application without having to rely on external processing frameworks.
But because Kafka Streams is itself a Java client library, developers need to package and deploy it themselves; Kafka Streams is an open source version, so reliability and availability are not guaranteed and cannot be used on demand. In addition, the use process needs to use the flow of programming, the threshold of use is relatively high.
Message flow processing frameworks face major issues
Through introducing the former in the face of common message flow processing, both traditional streaming architecture and Kafka Streams, for developers will face some problems, especially in the face of more than 70% of the simple demand flow scenarios, the original scheme abuses have been amplified, the customer still need to invest more human resource cost and higher, The architecture is also complex. Generally speaking, there are four main problems facing us at present:
1. High cost of operation and maintenance. The R & D team wrote the code by itself and maintained it in the later stage, which resulted in high cost of operation and maintenance;
2, the technical cost is high, for many lightweight or simple computing needs, need to carry out the technical selection, the technical cost of introducing a new component is too high;
3. Unpredictable learning costs. After a component is selected, the R&D team needs to learn and maintain it continuously, which brings unpredictable learning costs.
4. Reliability and availability cannot be effectively guaranteed after selecting open source components.
Faced with these problems, Ali message queue Kafka also launched a corresponding solution: Kafka ETL.
Ali Cloud solution – Kafka ETL
Kafka ETL profile
Ali Cloud message queue Kafka edition launched a lower cost Kafka-ETL component, is an o&M free flow computing component, the main feature is to support configuration flow processing messages. Kafka ETL components mainly provide non-time window dependent streaming computing services, which can be configured and even written to meet common data processing requirements including format conversion, content enrichment, local aggregation, routing distribution, etc.
Kafka ETL is split into directed acyclic graphs in use. Topics are used as a store during compute node transitions, stateful calculations are performed within topics, and message dumps are supported.
Kafka ETL supports the following templates:
1) Data cleaning: rule filtering;
2) Conversion template: string replacement, add prefix and suffix, string case conversion, space to divide;
3) Data enrichment template: data enrichment;
4) Split template: Topic Split;
5) Routing template: Topic routing.
Kafka ETL advantage
Compared with Storm, Spark Streaming, Flink and Kafka Streams, Kafka ETL has four advantages:
1) Out of the box, free of operation and maintenance;
Kafka ETL is still in the open beta free phase.
3) Low code, fast online support, low learning cost, one-stop experience, small technology investment, 80% time cost savings;
4) It is easy to monitor and troubleshoot, and the log information on the console is comprehensive.
Kafka ETL operation
Through the introduction of Kafka ETL applications and advantages can be seen in the use of Kafka ETL with light weight, low cost and other characteristics, not only that, Kafka ETL operation is relatively simple, only three steps can be completed ETL operation.
1) Step 1: Create a task
Select Kafka source instance and source Topic, and select the corresponding Kafka target instance and target Topic. And configure the initial message location, failure handling, and resource creation.
2) Step 2: Write the ETL master logic
Choose Python 3 as the functional language. There are a variety of data cleansing, data transformation templates, such as rule filtering, string substitution, add prefix/suffix and other common functions.
3) Step 3: Set task operation and exception parameter configuration, and execute it
Kafka ETL application scenario
Based on the features and advantages of Kafka ETL, Kafka ETL is mainly used in the following scenarios:
1) Dump scenario, supports data formatting to facilitate data dump;
2) Streaming processing scenarios, streaming computing, supporting the streaming processing of messages, mainly providing non-time window related streaming computing services;
3) Real-time behavioral computing scenarios, including risk control, finance, e-commerce and other scenarios requiring real-time behavioral computing;
4) It also supports other scenarios, including real-time reports and automated operation scenarios.
Kafka ETL architecture analysis
Through the first three parts of the introduction, we must have a certain understanding of Ali Cloud Kafka ETL, the main content of this section is to analyze Kafka ETL architecture, to help you have a deeper understanding of Kafka ETL. Kafka ETL is a one-stop solution for data flow and computation based on Kafka Connect + functions.
In today’s era of big data and cloud computing, a large complex system is usually composed of many subsystems that handle specific tasks. Subsystems are typically developed by different teams, so there are natural inconsistencies in the content and format of data across systems. When the data flows between subsystems, the data needs to be formatted to eliminate the format differences between systems. In addition, it may be necessary to collect heterogeneous data from various subsystems, do some processing and calculation on the collected data, and then deliver it to the data warehouse for subsequent data analysis. In this process, two typical scenarios can be abstracted: data flow scenario and data calculation scenario.
The main problem facing the data flow scenario is how to transfer data between heterogeneous systems.
Data computing scenarios are mainly faced with the problem of how to process and calculate data in the process of data flow.
These two main scenarios are described in detail below.
Data flow scenario
In a data flow scenario, it may be necessary to import data from various relational and non-relational databases into a data warehouse; Import mysql data into ElasticSearch to improve query experience. Some of the data is also imported into the graphics database. The main issues facing these scenarios are:
1) How to copy data between different sources;
2) How to meet the real-time transmission.
For example, if you have a change in mysql, you want it to be reflected in ElasticSearch immediately. Otherwise, the user will not be able to find the latest data. In addition, you need to ensure high availability, scalability, and extensibility of data copying.
The traditional solution to this problem might be to create a data copy tool for each data source. Such a scheme would bring the following problems:
1) First of all, there is the workload problem. You need to write a special tool for each scenario, which will be very heavy workload;
2) Serious business coupling. For example, if we want to monitor price changes, we need to add a producer to all businesses that change price. If the upper schema changes, the lower layer needs to modify the code, so the upper layer needs to be aware of all the lower layers.
Specialized tools do not seem feasible, so why not make a fully generic tool that supports data copying between arbitrary data sources? This sounds good, but in practice it is impossible, because it is so generic that it is hard to write specifications.
Kafka Connect is designed to address these heterogeneous data synchronization issues. The idea is to add a layer of message-oriented middleware between each data source, through which all data are stored and distributed. The benefits of this are:
1) Asynchronous decoupling is achieved through message-oriented middleware, and all systems only communicate with message-oriented middleware;
2) The number of analytical tools to be developed has also changed from n square to linear 2* N.
Kafka Connect is used to connect messaging systems to data sources. Depending on the data flow, Kafka Connect can be classified as source connector or Sink connector. The principle is simple. The Souce Connector parses the source data, converts it into a standard-format message, and sends it to Kafka Broker via Kafka Producer. Similarly, sink Connector consumes the corresponding Topic through Kafka Consumer and then delivers it to the target system. Throughout the process, Kafka Connect solves the problem of task scheduling, interaction with the messaging system, automatic scaling, fault tolerance, and monitoring, greatly reducing duplication of effort. However, parsing source system data to Message or message to target system data is implemented differently depending on the data system. For the current mainstream system, major manufacturers have provided the corresponding connector implementation.
Ali Cloud Message queue Kafka version provides fully hosted, o&m free Kafka Connect for data synchronization between Message queue Kafka version and other Ali cloud services. Kafka supports Mysql Source Connector, OSS Sink Connector, MaxCompute Sink Connector, and FC Sink Connector The connector. If users want to use these Connectors for data synchronization, they can pull up connector tasks with one click by doing a few configurations on the graphical interface of the message queue Kafka console.
Data computing scenario
Kafka Connect solves the problem of data synchronization between heterogeneous data sources, and while it also provides Transformer to address some of the data conversion requirements, it still lacks real-time computing power. In order to meet the requirements of real-time data processing in these scenarios, there are many excellent processing tools on the market, from the original Hadoop and Hive to Spark, Flink and Kafka Streams, etc., all provide corresponding component modules and upstream and downstream solutions.
However, there are more or less problems in these solutions. The main problems are as follows:
- First of all, the processing framework is heavy and takes up many resources. For example, the current popular Spark and Flink both need to build a cluster first, which requires a lot of resources to run. The cluster size is generally configured based on peak traffic. In most cases, resources are wasted.
- Secondly, in many frameworks, it is necessary to select the technology according to the actual needs, which may require a special team or people to operate and maintain in the later stage. This process requires large learning costs and maintenance costs in the later stage.
For simple calculations that are partially stateless, functional calculations may be a good choice. Function computing on Ali Cloud is an event-driven, fully managed computing service. When computing with functions, users do not need to purchase and manage infrastructure such as servers, but simply write and upload code. Functional computing helps users prepare computing resources, run tasks flexibly and reliably, and provide log query, performance monitoring, and alarm functions. As you can see, functional computation provides computing power to the user’s many scenarios in an easy-to-use manner.
Ali cloud message queue Kafka version of the recent launch of Kafka ETL components, Kafka+Kafka Connect + function computing architecture, can be a good response to data dump + real-time computing problems. It has the advantages of light weight, low learning cost, short development cycle, dynamic resource expansion, simple and fast.
Kafka+Kafka Connect + function computing cloud native data application solution, through Kafka Connect as a real-time processing task trigger, can receive the data sent to the message queue cluster in real time, and then forward to the function calculation, trigger the real-time data processing task operation. In this data flow stage, a large number of heterogeneous system data are collected into Kafka in various ways, and then centering on Kafka for subsequent processing. In addition to ensuring real-time data flow, Kafka Connect solves tasks scheduling, interaction with messaging systems, automatic scaling, fault tolerance, and monitoring, greatly reducing repetitive work. After the data is calculated by the function, it will automatically trigger the data processing logic written by the user to calculate the original message content. Finally, function computes can post the processed data to a user-specified target, such as message queue Kafka, or to Max Compute for further data analysis. The entire task described above can be configured, created, and run only through the Kafka console graphics page on the cloud.
Description of application scenarios
Let’s take a look at an example of Kafka ETL in action. In this example, a user’s general usage scenario is as follows: Collect logs from an e-commerce service system, store them to the Kafka side, process the log data, and finally deliver the processed data to two target ends: one is delivered to MaxCompute for data analysis; The other is to post to ElasticSearch for log retrieval.
Now let’s see how message queue Kafka does this by node:
1) Step 1: Collect raw logs to Kafka version of the message queue Topic
Some mature open source components such as FileBeat, Logstash and Flume can be used to deliver user application log messages to Kafka. Normally, this step posts the raw log information to Kafka. Of course, some simple transformations can be done here, but this is not the case. Instead, a raw log is kept. The original log may come from various related applications, with slightly different content and format.
In this example, a log is generated in the order application. The log contains the user Id, action, order Id, and current status:
From the payment application, another log is generated. The logs also contain the above information, but the format is slightly different.
These two logs from different subsystems are collected into a Topic in Kafka called user_order_RAW. These two logs, which ultimately correspond to the two messages in this Topic, both have null keys and value as the original contents of the log. As you can see, the two messages contained in this Topic have different message formats due to different source system log formats.
2) The second step is to do simple data processing and calculation for the messages in the Topic
After the data arrives at Kafka’s Topic, Kafka Connect consumes the message and posts it to the function calculation. After the data arrives at the function calculation, the data needs to be processed. The goal of the calculation is to extract the UserId, Action, OrderId, and Status and convert the data to uppercase letters. Then all processed messages are sent to MaxCompute for analysis, and all messages that are filtered for Action pay are sent to Elastic Search.
In this step, you can create ETL tasks on the Kafka console graphical interface. The user selects the data source Topic: user_ORDER_RAW, and then writes a piece of code to process the data. Here, ETL has provided part of the template, which can be modified slightly.
The code for this example is shown below. In this example, the user needs to write code to extract UserId, Action, OrderId, and Status from logs of different formats, and then route all processed messages to the target Topic.
3) As the last step, the processed message can be delivered to the target end again.
The function evaluates the processed message and posts it back to Kafka. After this step, all the messages are routed to the target Topic user_ORDER_processed, which contains two messages with null keys and values as shown below:
In addition, the message whose Action is pay is routed to Topic: user_order_pay_info, which contains a message with a null key and value as follows:
As you can see, the message format has been unified at this point.
In this example, all order-related messages in Topic: user_ORDER_processed are posted to MaxCompute for data analysis. Post the payment information from Topic: user_order_pay_info to ElasticSearch for subsequent search.
In this step, you can create the corresponding Kafka Connect task with one click to post data to the corresponding destination.
To summarize the whole process. In this example, all you need to do is configure an ETL task on the message queue Kafka console and write a little processing code. Instead of sending the data back to Kafka after the second step, route the data directly to MaxCompute and ES after the second step. In this case, the processed data is sent back to Kafka and then posted to the target system. In this way, a copy of the processed data is kept on the Kafka side, and the user has the flexibility to further process the data or continue to post it to other third-party systems.
Ali Cloud message queue Kafka version of the advantages
Finally, I would like to share some additional kernel differentiation advantages of Kafka, the message queue on Ali Cloud. Ali cloud message queue Kafka version in the process of development in addition to solve the problems of ease of use and stability, but also to have a degree of differentiation, and in the kernel level to make their own core competitiveness and advantages.
Ali Cloud message queue Kafka edition supports two storage engines, cloud storage and Local storage. Local Topic refers to Kafka native way to store data, retain all features of open source Kafka, 100% compatible with open source Kafka. Cloud storage is the focus of the next to be introduced, message queue Kafka through the research of cloud storage engine, completely solve the native Kafka some deep bugs, as well as because of its own architecture and difficult to solve the problem, to achieve support for mass partition, through multiple copy technology to reduce storage costs, and support seamless migration elastic shrinkage. These three features and the technical details are described in detail next.
Support for massive partitions
In message engines, the common message storage methods are fragmented storage and centralized storage.
The main advantage of fragmented storage, which is usually stored in Topic or partition latitude, is that the architecture is simple and the capacity of persistence can be controlled for Topic or partition. Kafka is a partition-based fragmented storage architecture. When partitions are not large, sequential reads and writes are performed on disks to achieve high read and write performance. Typically, a generic Kafka cluster can support partition sizes up to the thousand level. If the partition size continues to grow and most partitions receive read/write requests, the sequential read/write becomes random due to this design problem, which leads to a sharp decline in Kafka’s read/write performance.
Unlike fragmented storage, centralized storage centralizes all messages into the same Commit Log and then builds queues based on Topic and partition information, which are often used as indexes. The main advantages of centralized storage over fragmented storage are the large number of partitions supported and the ease of controlling disk water levels by deleting old Commit logs. In Ali cloud message queue Kafka, the underlying self-research cloud storage engine is using centralized storage mode, cloud storage engine compared to Kafka native storage main advantages are:
1) To solve the problem of Kafka partition scale expansion, performance dramatically decreased, compared to the native Kafka partition scale, its support partition scale can reach one hundred thousand level;
2) The self-developed cloud storage engine can achieve better performance than native Kafka’s fragmented storage when a large number of partitions are written simultaneously; At the same time, the writing time is optimized to reduce burr generation.
Multi-copy technology optimization
To ensure high reliability and availability of Kafka clusters, it is common to set up 3 replicas for all topics. This way, in the event of a machine outage, Kafka can quickly select a new Leader from the available Follower copies to take over from the Leader on the down machine. When Kafka selects a block storage device, it selects a cloud disk on Ali Cloud. A cloud disk is a block-level data storage product provided by Ali Cloud for cloud server ECS, featuring low latency, high performance, persistence, and high reliability. Cloud disks use the distributed three-copy mechanism to ensure data reliability and availability for ECS instances.
In this context, setting up 3 replicas at the Kafka level would actually have 9 replicas due to the use of cloud disks. At the same time, the followers on the Kafka layer need to actively synchronize data with the Leader, which consumes computing and network resources of the cluster and increases user traffic by three times. However, if you set up single replicas at the Kafka level, high availability is not guaranteed because Kafka itself does not take advantage of the cloud disk’s three-replicas capability. Therefore, how to make good use of the cloud disk 3 copy capacity, reduce the storage cost and network cost, has become a big challenge.
Ali Cloud solves the problems of storage cost and network cost by accessing its own cloud storage engine. The core principle is: logical queue and physical queue are introduced into the self-developed storage engine. A logical queue is a concept exposed to the user, which can be understood as a partition seen by the client, while a physical queue is used to store data. Logical queues and physical queues are bound together by mapping. In a custom engine, all partitions are logically single-copy. Data reliability and availability are guaranteed by the three-copy mechanism at the bottom of the cloud disk. In normal cases, data sent to a specific logical partition is written to the corresponding physical queue based on the mapping relationship. Similarly, consumption is pulled from the actual physical queue based on the mapping.
Let’s look at how cloud storage can be fault-tolerant and highly available. For example, when the ECS of node 0 is down, you can use QueueMapper to switch the mapping of logical Queue 0 to the existing Queue queue-3 on node 1 or add a physical Queue queue-4. At this point, messages destined for logical Queue 0 will be routed to queue-3 or queue-4. In this case, the sending service is not affected, the user can still send successfully, and the latest messages can be consumed. Of course, during this Failover, there is a problem: messages on node-0 on logical queue -0 cannot be consumed temporarily; However, for most application scenarios, a short partial message consumption delay is not a big deal, as long as it does not affect delivery.
When the ECS of node-0 goes down, the standby ECS of Ali Cloud will quickly generate a new machine to replace node-0, mount the original cloud disk, and restore node-0 services within minutes. After node -0 recovers, the mapping of logical Queue-0 is switched back to queue-0, and the system restores to the original state. At this point, send/consume remains the same as native Kafka.
In this way, storage costs are reduced to about one-third of native Kafka. At the same time, since the number of copies is 1 at the Kafka level, the Follower does not have to synchronize data from the Leader, and the network traffic is saved about a third of that of native Kafka.
Horizontal expansion, second – level data balancing
Elastic scaling is one of the core capabilities of message queues. Since the Kafka server nodes are stateful, after the addition of several nodes, the queue of each Topic needs to be rebalanced so that the traffic sent or consumed by clients to the cluster can be evenly sent to each back-end server node.
As Kafka scales the machine horizontally, there are two main ways to do data balancing:
The first is to add queues to a new broker. The main pain points of this approach are:
1) The system status changes. In this case, some early versions of multi-language clients need to be restarted actively; otherwise, new partitions cannot be consumed;
2) The second problem is the design of Kafka, the number of fragments can not be reduced, resulting in the subsequent cannot shrink.
The second approach is data migration. The main pain points of this approach are:
1) Traffic replication causes network storms and interferes with normal use;
2) Balancing is related to data volume. If the data volume is large, it may take several days to migrate.
So, how does the cloud storage engine solve the above problem?
As mentioned above, message queue Kafka introduces two levels of queues: the first level is logical queue, and the second level is physical queue, which is ali Cloud’s self-developed cloud storage queue. The logical queue is exposed, and the physical queue is used to store the actual data. The mapping between logical queues and physical queues is maintained through the QueueMapper module, as shown in the following figure.
A logical queue can be composed of multiple physical queues, which can be segmented by point mapping to ensure sequence. In capacity expansion, you only need to point the logical queue to the physical queue on the new machine, so that the new messages can be directly written to the new machine according to the new mapping relationship. Similarly, when consuming, you can find the actual physical queue based on the fragment mapping of the points, and then read the message from the physical queue.
It can be seen that message queue bounce and migration are solved through two-level queue segmentation mapping, which has the following advantages:
1) After the server is expanded or shrunk, the number of queues does not change and the system status remains unchanged;
2) No data migration is required during capacity expansion and shrinkage, which is time-consuming and can complete Topic queue rebalancing within seconds;
3) It gives consideration to throughput and scalability without affecting the performance of the original message queue.
conclusion
In Kafka Streaming scenarios, traditional solutions such as Storm, Spark Streaming, Flink and Kafka Streams are used. However, developers may encounter many problems in the process of using these frameworks. Especially in the face of more than 70% of the needs of simple flow scenarios, will encounter high operation and maintenance costs, high technical costs, learning costs unpredictable and low availability, reliability pain points, Ali cloud message queue Kafka release Kafka ETL components, is a free operation and maintenance flow computing components, Kafka+Kafka Connect + function computing architecture, can be a good response to data dump + real-time computing problems, with free operation and maintenance, low cost, low code, easy to monitor and other advantages.
Kafka ETL is currently in free public beta. Ali Cloud message queue Kafka edition has differentiation advantages at the kernel level, welcome to the link below for more details.
https://www.aliyun.com/product/kafka
If you have more questions, please join the Kafka nail communication group.
Click here to check out the live replay for more details!