Author: Xiangzi, R&D engineer of Individual Push platform

I. Business background

Message queue plays a very important role in the whole system.

Priority queue scheme based on Kafka

For the above scenario, Individual push designed the first version of the priority queue scheme based on Kafka. Kafka is a high-performance, distributed messaging system developed by LinkedIn. Kafka has a wide range of applications in Twitter, such as log collection, online and offline message distribution.

architecture

  1. For a certain priority, different topics are stored according to the dimension of task (single push task). One task can only be written to one Topic, and one Topic can store multiple tasks.

  2. According to the priority quota (such as 6:3:1), the consumption module obtains the number of messages of different priorities, and polls to obtain messages of the same priority. In this way, users with a higher priority can send messages more quickly and users with a lower priority cannot send messages.

Problems encountered with Kafka schemes

With the continuous development of individual push business, the number of accessed APPS gradually increased, and the priority scheme of the first version also gradually exposed some problems:

  1. When apps with the same priority push more and more tasks at the same time, the incoming task messages will be delayed because the previous task messages are still in a queue. As shown in the figure below, when task1 messages are too large, taskN will remain in a wait state until the task1 consumption ends.
  2. Kafka throughput drops dramatically as the number of topics increases from 64 to 256. Kafka has a physical file for each Topic and partition. As the number of topics increases, the scattered message drop-off strategy leads to intense disk I/O competition, so we cannot mitigate the problem in point 1 by simply increasing the number of topics.

This led to a new round of technology selection, in which we needed to be able to create a large number of topics without being inferior to Kafka in throughput. After a period of research, Apache Pulsar caught our attention.

3. Why Pulsar

Apache Pulsar is an enterprise-class distributed messaging system, originally developed by Yahoo, which opened source in 2016 and graduated as a Top-level project of the Apache Foundation in September 2018. Pulsar has been used in Yahoo’s production environment for more than three years, serving Mail, Finance, Sports, Flickr, the Gemini Ads Platform, Sherpa (Yahoo’s KV storage).

architecture

Number of Topics Pulsar can scale up to a million topics while still maintaining good performance. The scalability of a Topic depends on its internal organization and storage. Pulsar data is stored in bookie (BookKeeper server), messages from different topics in the write state are sorted in memory, and finally aggregated into large files, where fewer file handles are required. On the other hand, Bookie’s IO is less dependent on the Pagecache of the file system, and Pulsar is therefore able to support a large number of themes.

Pulsar supports three consumption models: Exclusive, Shared, and Failover.

Exclusive (Exclusive)

Shared: A Shared pattern in which multiple consumers can connect to the same Topic and messages are distributed to consumers in turn. When a consumer goes down or disconnects voluntarily, unacknowledged (ACK) messages distributed to that consumer are rescheduled and distributed to other consumers.

Failover: A subscription has only one consumer and can have multiple backup consumers. If the primary consumer fails, the backup consumer takes over. You don’t have two active consumers at the same time.

Exclusive and Failover subscriptions allow only one consumer to use and consume each subscribed Topic. Both patterns use messages in Topic partition order. They are best suited for Stream use cases where strict message order is required.

Shared allows multiple consumers per topic partition. Each consumer in the same subscription receives only a portion of the messages in the Topic partition. Shared works best for queues that do not require a guaranteed message order and can be extended as many consumers as needed.

Storage Pulsar introduces Apache BookKeeper as a storage layer, a distributed storage system optimized for real-time systems with features such as scalability, high availability, and low latency. For details, please refer to the BookKeeper website.

Segment BookKeeper uses Segment (called ledger inside BookKeeper) as the base unit of storage. Everything from segments to message granularity is evenly distributed across BookKeeper’s cluster. This mechanism ensures that data and services are evenly distributed across the BookKeeper cluster.

Pulsar and Kafka both make Topic storage based on the logical concept of partition. The fundamental difference is that Kafka’s physical storage is in units of partitions, and each partition must be stored on a broker as a whole (a directory). The partition of Pulsar takes segment as the unit of physical storage, and each partition is broken up and evenly distributed among multiple bookie nodes.

As a direct effect, the partition size of Kafka is dependent on the storage of a single broker. Partition in Pulsar can take advantage of the storage capacity of the entire cluster.

When a partition reaches its maximum capacity and needs to be expanded, Kafka may need to add a new storage node and rebalance the partition data from node to node.

Pulsar simply adds a new Bookie storage node. Due to the large remaining space, newly added nodes are preferentially used and receive more new data. The expansion process does not involve copying or moving any existing data.

Broker failure Pulsar has the same advantage when a single node fails. If one of the Pulsar service nodes broker fails, since the broker is stateless, other brokers can quickly take over the Topic without copying the Topic data. If a storage node Bookie fails, other BookIes in the cluster background concurrently read data from multiple Bookie nodes and automatically recover the data of the failed node. Front-end services are not affected.

Bookie failure Copy fixes in Apache BookKeeper are quick many-to-many fixes at the Segment (or even Entry) level. This approach copies only the necessary data, which is less elaborate than replicating the entire topic partition. As shown in the figure below, Apache BookKeeper can read the message in Segment 4 from both Bookie 3 and Bookie 4 and fix Segment 4 at Bookie 1 when an error occurs. All replica repair is done in the background, transparent to the Broker and the application.

When a Bookie node fails, BookKeeper automatically adds a new Bookie available to replace the failed Bookie. The data in the failed Bookie is restored in the background, and all Broker writes are not interrupted, without sacrificing the availability of the theme partition.

4. Priority queue scheme based on Pulsar

In terms of design, Pulsar is not much different from Kafka. In the new solution, however, the Pulsar team solved the problem in Kafka with the help of Pulsar’s features.

  1. Topics are dynamically generated based on tasks to ensure that incoming tasks do not wait due to message accumulation of other tasks.
  2. Medium-high priority tasks share one Topic, while low-priority tasks share N topics.
  3. Within the same priority, each task polls to read messages. When the quota is used up, the message is forwarded to the next priority.
  4. Each task can dynamically adjust quota for the same priority and read more messages for the same opportunity.
  5. In Shared mode, consumers can be added and removed dynamically without triggering Rebalance.
  6. BookKeeper provides more flexibility in adding storage resources.

Other practices of Pulsar

  1. Different subscriptions are relatively independent of each other, requiring a different subscriptionName subscription if you want to repeatedly consume messages for a Topic. But as new subscriptionnames are added, the backlog keeps accumulating.
  2. If a Topic is unsubscribed, messages sent to it are deleted by default. Therefore, if the producer sends before the consumer receives, it is important to ensure that the Topic subscription exists before the producer sends, even if the SUBSCRIBE is closed. Otherwise, messages sent during that time will be unprocessed.
  3. If no one sends a message and no one subscribes to it, the Topic is automatically deleted after a certain period of time.
  4. The TTL and other Settings of Pulsar are valid for the entire namespace, not for a single Topic.
  5. All Pulsar keys are built in the root directory of ZooKeeper. It is recommended to add the total node name during initialization.
  6. In Pulsar’s current Java API design, messages are explicitly acknowledged by default, unlike Kafka.
  7. Storage size on Pulsar Dashboard is not the same concept as storage size (including copy size) on Prometheus.
  8. thedbStorage_rocksDB_blockCacheSizeSet large enough; When the message volume is large and there is a large backlog, using the default size (256M) will take too long to read, resulting in slow consumption.
  9. Use multiple partitions to improve throughput.
  10. In the event of a system exception, actively fetch stats and stats-internal, which contains a lot of useful data.
  11. If the volume of a single Topic is too large in the business, it is suggested to change itbacklogQuotaDefaultLimitGBSet to large enough (default 10G) to avoid default useproducer_request_holdThe mode is block producer. Of course, you can choose the right one according to the actual businessbacklogQuotaDefaultRetentionPolicy.
  12. Proactively select backlog quotas based on actual business scenarios.
  13. If the read time is null in Prometheus, it may be because cache data is read directly. Pulsar reads messages from the write cache and then reads from the read cache. If none is hit, the entry is read from the log file after the entry seat is read in RocksDB.
  14. When writing messages, Pulsar writes journal and write cache simultaneously. Write cache asynchronously writes log files and RocksDB. Therefore, it is recommended to use SSD for Journal disks if resources are available.

Six, summarized

At present, The transformation scheme of Individual Push for priority middleware has been in trial operation in some live network businesses, and we are still paying attention to the stability of Pulsar. As an open source project only in 2016, Pulsar has many attractive features that complement the weaknesses of its competitors, such as cross-geographical replication, multi-tenancy, scalability, and read/write isolation. Although not yet widely used in the industry, Pulsar is likely to replace Kafka in terms of existing features. In the process of using Pulsar, we also encountered some problems. We would like to thank Zhai Jia and Guo Sijie (both core engineers of Stream Native and PMC members of Apache Pulsar) for their support and help.

References:

[1] Kafka: What is Pulsar good at? (www.infoq.cn/article/1Ua…)

[2] Kafka+Flink+DB(juejin.cn/post/684490…)