About the Apache Pulsar

Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability. GitHub address: github.com/apache/puls…

Founded in 2014, BIGO is a fast-growing technology company. Based on powerful audio and video processing technology, global real-time audio and video transmission technology, artificial intelligence technology, CDN technology, BIGO launched a series of audio and video social and content products, including BIGO Live (Live) and Likee (short video), has nearly 100 million users worldwide. Products and services have covered more than 150 countries and regions.

challenge

At first, BIGO’s message flow platform mainly used open source Kafka as data support. With the increasing data scale and continuous product iteration, BIGO message flow platform has doubled the data scale. Downstream online model training, online recommendation, real-time data analysis, real-time data warehouse and other businesses put forward higher requirements for real-time and stability of message flow platform. Open source Kafka clusters are difficult to support massive data processing scenarios. We need to invest more manpower to maintain multiple Kafka clusters, which will become increasingly expensive, mainly reflected in the following aspects:

  • Data stores are bound to message queue services, and a large number of data copies are required for cluster capacity expansion or partition balancing, resulting in cluster performance deterioration.
  • When a partition copy is not in ISR (synchronous) state, the failure of a broker may result in data loss or the partition being unable to provide read and write services.
  • Manual intervention is required when Kafka Broker disk failure/space usage is too high.
  • The cross-region synchronization of the cluster uses Kafka Mirror Maker (KMM), but the performance and stability are not as expected.
  • In catch-up read scenarios, PageCache pollution is likely to occur, resulting in read and write performance deterioration.
  • A limited number of Topic partitions are stored on Kafka Broker. The more partitions there are, the worse the disk read/write sequence and the worse the read/write performance.
  • Kafka cluster scale growth leads to a sharp increase in operation and maintenance costs, requiring a large amount of manpower for daily operation and maintenance; In BIGO, it takes 0.5 people/day to add a machine to a Kafka cluster and partition balancing. Scaling down one machine requires 1 person/day.

If Kafka continues to be used, the costs will continue to rise: scaling up machines and increasing operations manpower. At the same time, as the business grows, we have higher requirements for messaging systems: more stable, more reliable, easier to scale horizontally, and less latency. In order to improve the real-time, stability, and reliability of message queues and reduce the cost of operation and maintenance, we began to consider whether to do localized secondary development based on open source Kafka, or to see if there were better solutions in the community to solve the problems we encountered in maintaining Kafka clusters.

Why Pulsar

In November 2019, we started investigating message queues to compare the strengths and weaknesses of current mainstream messaging platforms and align them with our needs. During our research, we discovered that Apache Pulsar is the next generation cloud native distributed message flow platform, integrating messaging, storage, and lightweight functional computing. Pulsar provides seamless capacity expansion, low latency, high throughput, and supports multi-tenant and cross-geographical replication. Most importantly, Pulsar’s storage and computing separation architecture solves the problem of Kafka scaling perfectly. The Pulsar Producer sends the message to the broker, which writes to the second layer of storage, BookKeeper, via the Bookie client.

Pulsar adopts a layered architecture that separates storage and computing, supports multi-tenant, persistent storage, and multi-room cross-region data replication, and features high consistency, high throughput, and low latency for scalable streaming data storage.

  • Horizontal scaling: Seamless scaling to hundreds or thousands of nodes.
  • High Throughput: Already available at Yahoo! Has been tested in a production environment that supports pub-sub with millions of messages per second.
  • Low latency: Low latency (less than 5 ms) can be maintained even with large message volumes.
  • Persistence mechanism: Pulsar’s persistence mechanism, built on Apache BookKeeper, implements read and write separation.
  • Read/write separation: BookKeeper’s read/write separation IO model maximizes sequential disk write performance and is relatively friendly to mechanical hard disks. There is no limit to the number of topics supported by a single Bookie node.

In order to further understand Apache Pulsar and measure whether Pulsar can truly meet the needs of our production environment’s large-scale message Pub-Sub, we conducted a series of pressure tests beginning in December 2019. Since we were using mechanical hard drives without SSDS, we encountered some performance issues during pressure testing. With the help of StreamNative, we performed a series of performance tuning for both Broker and BookKeeper, resulting in improved throughput and stability of Pulsar.

After three to four months of pressure testing and tuning, we felt that Pulsar was fully capable of addressing the various issues we encountered with Kafka and launched Pulsar in a test environment in April 2020.

Apache Pulsar at BIGO: Pub-sub consumption pattern

In May 2020, we officially started using Pulsar clusters in our production environment. Pulsar’s scenario in BIGO is mainly the classic production and consumption model of pub-sub, with Baina service (data receiving service implemented in C++) in front, Kafka Mirror Maker and Flink, As well as clients in other languages such as Java, Python, and C++, the producer writes data to the topic. The backend consists of Flink and Flink SQL, as well as consumer clients in other languages.

Downstream, the business scenarios we docking include real-time data warehouse, real-time ETL (extract-transform-load, the process of extracting, transforming and loading data from the source end to the destination end), real-time data analysis and real-time recommendation. Most business scenarios use Flink to consume data in Pulsar Topic and process business logic; Other client languages used in business scenarios are mainly C++, Go, and Python. Data is written to third-party storage services such as Hive, Pulsar Topic, ClickHouse, HDFS, and Redis after being processed by their respective service logic.

Pulsar + Flink real-time streaming platform

At BIGO, we built a real-time streaming platform with Flink and Pulsar. Before introducing the platform, let’s take a look at the inner workings of Pulsar Flink Connector. In the Pulsar Flink Source/Sink API, there is a Pulsar Topic upstream, a Flink Job in the middle, and a Pulsar Topic downstream. How do we consume this topic, and how do we process the data and write it to Pulsar Topic?

According to the above on the left side of the code examples, initialize a StreamExecutionEnvironment, related configuration, such as to modify the property value, the topic. Then create a FlinkPulsarSource object, fill it with serviceUrl (BrokerList), adminUrl (Admin address), and topic data serialization, and finally pass in the Property, This allows you to read the data in the Pulsar Topic. The use method of Sink is very simple. First create a FlinkPulsarSink, specify target topic in Sink, then specify TopicKeyExtractor as key, and call addsink to write data to Sink. The production and consumption model is simple, much like Kafka.

How is the consumption of Pulsar Topic and Flink linked? As shown in the figure below, creating a New FlinkPulsarSource creates a new Reader object for each partition of the topic. The reader API of Pulsar Flink Connector is used to create a reader, which is a non-durable Cursor. Reader consumption is characterized by a commit as soon as a piece of data is read, so it is possible to see that Reader subscriptions have no backlog information on monitoring.

In version 2.4.2 of Pulsar, topics subscribed to non-durable Cursor do not store data in the cache of the broker when receiving data written by producer. This results in a large number of data read requests falling into BookKeeper, reducing data read efficiency. BIGO has fixed this issue in Pulsar 2.5.1.

How does Flink ensure exactly-once that a Reader subscries to a Pulsar topic and consumes data from a Pulsar topic? The Pulsar Flink Connector uses a separate subscription that uses the Durable Cursor. When Flink triggers checkpoint, The Pulsar Flink Connector checks the reader status (including the consumption location of each Pulsar Topic Partition) to a file, memory, or RocksDB. When checkpoint is complete, the Pulsar Flink Connector checks the reader status to a file, memory, or RocksDB. A Notify Checkpoint Complete notification is issued. After receiving a checkpoint completion notification, the Pulsar Flink Connector submits the current reader Offset (message ID) to the Pulsar broker as a separate SubscriptionName. At this point, the consumption Offset information is actually recorded.

After the Offset Commit is completed, the Pulsar broker stores the Offset information (represented as Cursor in Pulsar) in the underlying distributed storage system BookKeeper. The advantage of this is that when the Flink task restarts, There will be two layers of recovery protection. In the first case, we can checkpoint recovery: we can get the message ID of the last consumption directly from the checkpoint, get data from this message ID, and the data stream can continue to consume. If no recovery is made from checkpoint, Flink task will restart and consume at the Offset corresponding to the last Commit from Pulsar according to SubscriptionName. This prevents checkpoint corruption that prevents the entire Flink task from starting successfully.

The Checkpoint process is shown as follows.

Checkpoint N, notify checkpoint Complete, wait for a certain interval, then checkpoint N+1, The notify Checkpoint Complete operation is also performed. At this time, the Durable Cursor performs a Commit, and the Commit is performed on the server of Pulsar Topic. This ensures that checkpoint is exactly once and that message “keeps alive” according to its own subscription.

What problem does Topic/Partition Discovery solve? When the Flink task consumes a topic, the Flink task needs to be able to automatically discover partitions if they are added to the topic. How does Pulsar Flink Connector achieve this? Each task Manager contains multiple reader threads. The hash function maps topic partitions from a single Task Manager. When a new partition is added to a topic, The new partition is mapped to a Task Manager. When the Task Manager discovers the new partition, it creates a Reader to consume the new data. Users can set partition. The discovery. The interval – millis parameters, allocate testing frequency.

In order to lower the threshold for Flink to consume Pulsar topic and enable Pulsar Flink Connector to support more abundant new Flink features, The BIGO message queue team has added Pulsar Flink SQL DDL (Data Definition Language) and Flink 1.11 support for Pulsar Flink Connector. Previously, the Pulsar Flink SQL provided by the official only supports Catalog, so it is not very convenient to consume and process the data in Pulsar topic through DDL. In the BIGO scenario, most of the topic data is stored in JSON format, and the JSON schema is not pre-registered, so it can only be consumed after the DDL of the topic is specified in Flink SQL. For this scenario, BIGO has done secondary development based on Pulsar Flink Connector, providing a code framework for consuming, parsing, and processing Pulsar Topic data in the form of Pulsar Flink SQL DDL (as shown in the figure below).

In the code on the left, the first step is to configure the consumption of Pulsar topic. The first step is to specify the topic DDL form, such as RIP, rtime, uid, etc. The following is the basic configuration of consuming Pulsar topic. For example, topic name, service-URL, admin-URL, etc. When the underlying reader reads the message, it decompresses the message based on the DDL and stores the data in the test_FLink_SQL table. The second step is conventional logical processing (such as field extraction and join of tables) to obtain relevant statistics or other related results, and then return these results and write them to HDFS or other systems. Third, extract the corresponding fields and insert them into a Hive table. Since Flink 1.11 has better write support for Hive than 1.9.1, BIGO has made another API compatibility and version upgrade to enable Pulsar Flink Connector to support Flink 1.11. The real-time streaming platform built by BIGO based on Pulsar and Flink is mainly used in real-time ETL processing scenarios and AB-test scenarios.

Real-time ETL processing scenarios

Real-time ETL processing scenarios mainly use Pulsar Flink Source and Pulsar Flink Sink. In this scenario, Pulsar topics implement hundreds or even thousands of topics, each with its own schema. We need to do general processing for hundreds of topics, such as field conversion, fault tolerance, writing to HDFS, etc. Each topic corresponds to a table in HDFS, and hundreds of topics map hundreds of tables on HDFS, each table with different fields. This is the real-time ETL scenario we encounter.

The difficulty with this scenario is the large number of topics. If each topic maintains a Flink task, the maintenance cost is too high. Before, we wanted to directly Sink the data in Pulsar topic to HDFS through HDFS Sink Connector, but it was very troublesome to deal with the logic inside. Finally, we decided to use one or more Flink tasks to consume hundreds of topics, each with its own schema, directly subscribe all topics with Reader, perform post-processing of schema parsing, and write the processed data to HDFS.

As the program runs, we find that there is a problem with this scheme: the pressure imbalance between the operators. Since some topics have high traffic and some have low traffic, if they are mapped to the corresponding Task Managers by random hashing, some task Managers will process high traffic and some task Managers will process low traffic. This results in serious congestion on some task machines, which slows down Flink flow processing. Therefore, we introduced the concept of slot groups, which are grouped according to the traffic of each topic. The traffic is mapped to the number of partitions of the topic, and the creation of a topic partition is based on the number of partitions. If the traffic is high, more partitions are created for the topic, and vice versa. During grouping, topics with low traffic are grouped into a group and topics with high traffic are grouped into a separate group, which isolates resources and ensures overall traffic balance for Task Manager.

AB – test scenarios

Real-time data warehouse needs to provide hour table or day table to provide data query services for data analysts and recommendation algorithm engineers. Simply speaking, there will be a lot of dots in the APP, and various types of dots will be reported to the server. If the raw points are directly exposed to the business side, different business users need to access different raw tables to extract data from different dimensions and perform correlation calculation among the tables. Frequent data extraction and association operations on the underlying basic table will seriously waste computing resources, so we extract dimensions that users care about from the basic table in advance, and combine multiple dots together to form one or more wide tables, covering 80% to 90% of the tasks recommended above or related to data analysis.

In the scenario of real-time data warehouse, real-time intermediate tables are also needed. Our solution is to use Pulsar Flink SQL to parse the consumed data into corresponding tables for topic A to topic K. Under normal circumstances, the common practice to aggregate multiple tables into one table is to use JOIN. For example, join the tables A to K according to uid to form A very wide wide table. However, it is inefficient to join multiple wide tables in Flink SQL. So BIGO uses a union instead of a Join to make a wide view, returns the view by the hour, writes to the ClickHouse, and provides the downstream business side with real-time queries. Using union instead of JOIN to speed up table aggregation allows hour-level intermediate table output to be reduced to minute level.

Exporting a day table may also need to join an offline table stored on hive or other storage media, that is, join a flow table to an offline table. In case of direct join, checkpoint stores a large number of intermediate states, so we optimized it in another dimension.

The left part is similar to the hour table. Pulsar Flink SQL is used to consume and convert each topic into the corresponding table. Union operation is performed between tables, and the table obtained by union is input to HBase in days (HBase is introduced here to replace its join).

On the right, join offline data. Use Spark to aggregate offline Hive tables (such as tables A1, A2, and A3). The aggregated data is written to HBase using a carefully designed row-key. The data aggregation status is as follows: Assume that the first 80 columns of the wide table are filled in the key of the data on the left, and the data calculated by the Spark task corresponds to the same key. Fill in the last 20 columns of the wide table to form a large wide table in HBase, and then remove the final data from HBase and write it to the ClickHouse. For upper-layer users to query, this is the main architecture of ab-test.

The business income

Since its launch in May 2020, Pulsar has been running stably, processing tens of billions of messages on an average day with a byte inflow of 2-3 GB/s. Apache Pulsar provides high throughput, low latency, high reliability and other features, greatly improving BIGO message processing capacity, reducing message queue operation and maintenance costs, saving nearly 50% of the hardware cost. Currently, we have hundreds of Pulsar broker and Bookie processes deployed on dozens of physical hosts. We have migrated ETL from Kafka to Pulsar, using the mixing pattern of bookie and broker on the same node. And gradually migrate the business (such as Flink, Flink SQL, ClickHouse, etc.) consuming Kafka clusters in the production environment to Pulsar. Traffic on Pulsar will continue to increase as more services migrate.

Our ETL task has more than 10,000 topics, with an average of three partitions per topic, using a three-copy storage strategy. When Kafka is used, sequential read and write disks degenerate into random read and write disks as the number of partitions increases, and the read and write performance deteriorates severely. Apache Pulsar’s storage layering design can easily support millions of topics, providing elegant support for our ETL scenarios.

future

BIGO in Pulsar broker load balancing, broker cache hit ratio optimization, broker related monitoring, BookKeeper read and write performance optimization, BookKeeper disk IO performance optimization, Pulsar and Flink, Pulsar and Flink A lot of work has been done in Flink SQL combination and other aspects, which has improved the stability and throughput of Pulsar and lowered the threshold of Flink and Pulsar combination, laying a solid foundation for the promotion of Pulsar.

In the future, we will increase the application of Pulsar in BIGO scenarios to help the community further optimize and improve Pulsar functions, as follows:

  1. Develop new features for Apache Pulsar, such as support for Topic Policy related features.
  2. Migrate more tasks to Pulsar. This work involves two aspects. One is migrating tasks previously used in Kafka to Pulsar. Second, the new business is directly connected to Pulsar.
  3. BIGO intends to use KoP to ensure a smooth transition of data migration. Because BIGO has a lot of Flink tasks that consume Kafka clusters, we wanted to be able to do a layer of KoP directly in Pulsar to simplify the migration process.
  4. Continuous performance optimization for Pulsar and BookKeeper. BIGO has high requirements on system reliability and stability due to high traffic in the production environment.
  5. Continue to optimize BookKeeper’s IO stack. The underlying storage of Pulsar is an IO intensive system. Only by ensuring high IO throughput at the bottom can the upper throughput be improved and the performance be stable.

Author’s brief introduction

Hang Chen, Apache Pulsar Committer, is the team leader of BIGO Big Data Messaging Platform, responsible for creating and developing a centralized publish-subscribe messaging platform for large-scale services and applications. He introduced Apache Pulsar to the BIGO messaging platform and connected with upstream and downstream systems such as Flink, ClickHouse, and other real-time recommendation and analytics systems. He is currently responsible for Pulsar performance tuning, new feature development and Pulsar ecosystem integration.

reading

  • Apache Pulsar performance Tuning in BIGO (Part 1)
  • Apache Pulsar performance Tuning in BIGO (Part 2)
  • Apache Pulsar’s landing practice in the field of energy Internet

Click on thelink, get Apache Pulsar hardcore dry goods information!