About the Apache Pulsar

Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability. GitHub address: github.com/apache/puls…

Device Access Service (IoTDA) is the core service of Huawei Cloud Internet of Things platform. IoTDA requires a reliable messaging middleware. After comparing the capabilities and features of several messaging middleware, Apache Pulsar has become the first choice for Huawei cloud iot messaging middleware with its multi-tenant design, computing and storage separation architecture, and support for Key_Shared mode consumption. This paper introduces the launching process of Pulsar in Huawei Cloud Internet of Things, the problems encountered in the launching process and the corresponding solutions.

This section describes huawei cloud device access services

Device Access Service (IoTDA) is capable of connecting massive devices to the cloud, two-way message communication between devices and the cloud, data flow, batch device management, remote control and monitoring, OTA upgrade, and device linkage rules. The following figure shows huawei cloud Iot architecture diagram. The upper layer is iot application, including Internet of vehicles, smart city, smart park, etc. The device layer is connected to the iot platform through a direct gateway and edge network. At present, the number of Huawei cloud IoT connections exceeds 300 million, and the IoT platform competitiveness ranks first in China.

Data flow refers to the fact that after users set rules on the Internet of Things platform, the platform will trigger corresponding rule actions when the device behavior meets the rule conditions to realize user needs. For example, the platform provides full-stack services for storing, computing and analyzing device data, such as DIS, Kafka, OBS and InfluxDb, etc. for receiving other huawei cloud services. It can also connect to the customer’s system through other communication protocols, such as HTTP and AMQP. In these actions, the Internet of Things platform mainly serves as the client or server. Usage scenarios can be divided into three categories based on user types:

  • Larger customers tend to push to messaging-oriented middleware (Pulsar, Kafka) and build their own business systems on the cloud for consumption processing.
  • Mid-long tail customers often choose to push data to their own databases (such as MySQL) for processing, or to have their own HTTP servers receive the data for processing.
  • More lightweight clients will choose to create simple clients to connect via the AMQP protocol.

Pain points of the original push module

The original push module adopts Apache Kafka scheme, which has some disadvantages and complicated expansion operation, bringing burden to the development and operation team. In addition, the original push module supports client type and server type push, but does not support AMQP push. Its architecture is shown as follows. The Consumer is constantly pulling messages from Kafka and storing failed messages in the database for retry. This mode of operation brings up a number of problems:

  • Even if many customers’ servers are unreachable, the consumer still needs to pull messages from Kafka (because Kafka has only one topic) and try to send them.
  • Message storage time and size cannot be configured on a per-user basis.
  • Some customers have weak servers that cannot control the rate at which messages are pushed to individual customers.

The Topic number

In May 2020, to enhance the competitiveness of our products, we plan to enable customers to receive flow data through AMQP protocol. Closer agreement client access more complex, and customers may be closer to the client integration on the mobile end, every time two hours online, in this case, we need to make sure that the customer when use there will be no data loss, therefore request message middleware to support more than the number of rules topic (some customers under the single rule of large amount of data, Single topic cannot support). Currently, we have over 30,000 rules, and expect to reach 50,000 soon, and continue to grow.

Kafka topics occupy file handles at the bottom and share OS caches, which cannot support larger topics. Kafka from other vendors can support up to 1800 topics. To support queues of the order of magnitude, we must maintain multiple Kafka clusters, as shown in the diagram below.

The implementation of kafka-based solutions can be very complex. We need to maintain not only the life cycle of multiple Kafka clusters, but also the mapping between tenants and Kafka clusters. Kafka does not support Shared consumption model, and requires two layers of relaying. In addition, if the number of topics on a Kafka cluster has reached its upper limit, the number of topics needs to be expanded due to excessive data flow. In this case, the original cluster cannot be expanded without migrating data. The overall scheme is very complex, which is a great challenge to development and operation.

Why Pulsar

To solve our problems in Kafka, we began to investigate popular messaging middleware on the market and learned about Apache Pulsar. Apache Pulsar is a cloud-native distributed messaging and streaming platform with native support for many excellent features, including its unique Key_Shared mode and million topic support.

  • Pulsar supports Key_Shared mode. If a single shard of Pulsar supports 3000 QPS, an AMQP client of the client only supports 300 QPS. In this case, the best solution is to use Pulsar’s shared mode and enable multi-client connections, where 10 clients are connected simultaneously to process data. If Failover mode is used, the number of partitions must be expanded to 10, resulting in a waste of resources.
  • Pulsar is scalable to millions of topics. We can map a rule to a Pulsar topic. When the AMQP client goes online, it can start reading from the last consumption to ensure that no messages are lost.

Pulsar is a multi-tenant design based on the cloud, whereas Kafka is more system to system docking, single tenant, high throughput. Pulsar considers deployment based on K8s, and the overall deployment is easy to achieve; Pulsar’s computing and storage are separated, the capacity expansion operation is simple, the interruption time of the expansion topic is short, retry can achieve no interruption of services; And support shared subscription type, more flexible. We compared Pulsar and Kafka in different dimensions, and the results are as follows:

Pulsar not only solved the problem with Kafka, but its message free feature was perfect for us, so we decided to try Pulsar.

Design).

When we first designed it, we wanted to use the Key_Shared consumption pattern for both client and server types. The following figure shows the design of client type (taking HTTP as an example). Every time the client configured a data flow rule, we created a topic, consumer consumption topic in Pulsar, and then pushed it to the client’s HTTP server through NAT gateway.

The design of server type push (AMQP as an example) is shown in the following figure. If the client is not connected to the AMQP client, even if the consumer is started to pull the data, it cannot proceed to the next step. Therefore, after the client is connected to the corresponding Consumer microservice instance through the load balancing component, the instance will start the corresponding consumer for consumption. An AMQP connection corresponds to a consumer.

The throughput of a single partition of a Pulsar cluster is limited. When the volume of rule data of a single customer exceeds the throughput, for example, when the performance specification of a topic is around 3000 and the estimated traffic volume of a customer is 5000, We need to expand the partition for topic. To avoid restarting the producer/consumer, we set the autoUpdatePartition parameter to true so that the producer/consumer can dynamically perceive the partition changes.

Problems encountered in the testing of the first version of the design

When testing the initial design scheme, we found that there were some problems in the scheme, mainly reflected in the following three aspects:

  • Client type push uses the above design to form a network relationship between the microservice instance and the consumer. Assuming we have 10,000 customer rules and four microservice instances, there will be 40,000 consumption-subscription relationships. A single microservice instance has 10,000 consumers in memory at the same time, and the size of the consumer receive queue is critical for throughput and memory consumption, but is not easy to configure. If the configuration is too large, consumers cannot send HTTP messages in abnormal scenarios. As a result, a large number of messages are stored in consumers, causing memory overflow. If there are 1000 consumers and the network is suddenly disconnected for 5 minutes, all messages in that 5 minutes will be stored in the receive queue. If the configuration is too small, the communication efficiency between the consumer and server is low, affecting system performance.
  • In Pulsar or rolling upgrade scenarios of production and consumption services, frequent requests for topic metadata put a lot of pressure on the cluster (the number of requests is the product of the number of instances and the number of topics).
  • autoUpdatePartitionSystem resources are adversely affected. If every topic is openautoUpdatePartitionBy default, each topic sends ZooKeeper requests every minute.

We reported this problem to the Pulsar community, and the StreamNative team gave us great support and help, suggesting that we set autoUpdatePartition parameters according to our needs after we divided the customers into groups. With the strong support of the community, we decided to make corresponding improvements and start planning the online program.

Online program

Our customers can be roughly divided into two types: one is busy users who push a large amount of data upstream when the business is busy, which is characterized by a fragment may not meet the demands and the number of users is small; The other is a relatively stable business with medium amount of data, which is characterized by a sufficient fragment and a large number of users.

We grouped the users according to the recommendations, deployed the workload to push the busy users separately, and shared the users with medium volume. At present, we group customers manually in the configuration center through SRE according to their business capacity. In the future, we will automatically group customers according to real-time statistics. Grouping the business not only greatly reduces the number of combinations between topic and consumer, but also reduces the number of requests for metadata at restart. In addition, the two types of user client parameters are not exactly the same after grouping. First, autoUpdatePartition is only enabled in the busy user topic. Second, the two sets of workloads have different receive queue sizes.

The deployment of

We used containerized deployment for two types of user deployments: Broker deployed in deployment mode and BookKeeper and ZooKeeper deployed in StatefulSet mode. Deployment scenarios include cloud deployment and edge deployment. Different deployment modes have different reliability and performance requirements. The deployment parameters are as follows:

On deployment, we found:

  • Write performance is best when the EnsembleSize and Write Quorum Size (Qw) parameters are the same among topics.
  • The cloud has a large number of messages. If the number of replicas is 2, 100% redundancy is required. If the number of replicas is 3, only 50% redundancy is required.

Pulsar tuning solution

The above solution was successfully launched half a year ago, and we also tested 50,000 topics in a test environment with 100,000 messages per second. During the test, we encountered some problems and adopted the tuning solution according to the specific situation. For details, please refer to Pulsar 50,000 Topic tuning. This section focuses on latency, ports, and improvement suggestions.

Reduce production/consumption delays

Using the test tool, we found that the overall end-to-end latency of the message was large.

To facilitate problem location, we developed a single topic debug feature. In high-volume message scenarios, it is not easy to enable global debug on the Broker, whether in a test or production environment. We added a configuration in the configuration center that prints detailed debug information only for the topics in the configuration list. With the single-topic debug feature, we quickly found that the maximum latency for messages occurred between the producer sending the message and the server receiving the message, probably due to the small number of Netty threads.

However, increasing the number of Netty threads does not completely solve this problem. We found that there was still a performance bottleneck in a single JVM instance. As mentioned above, after grouping by user data volume, small user groups needed to serve about 40,000 topics. Slow startup (resulting in long interruption time during upgrade), insufficient memory queues, and complex scheduling. We finally decided to hash small user groups again, and each instance was responsible for about 10,000 consumers, which successfully solved the problem of large delay in production and consumption.

Connect to the broker using port 8080

We use port 8080 to connect to the broker instead of port 6650 for two main reasons:

  • The logs are detailed, and most of the requests sent to 8080 are metadata requests, which are helpful for troubleshooting and easy to monitor. For example, Jetty’s requestLog can easily detect a topic creation failure, a producer creation timeout, and so on.
  • Data requests and metadata requests can be isolated, preventing topic creation and deletion operations when port 6650 is busy.

Compared with port 6650, port 8080 has poor efficiency and performance. In a scale of 50,000 topics, upgrading producer/ Consumer or broker requires creating a large number of producer/consumer requests to port 8080, such as partitions and lookup requests. We solved this problem by increasing the number of Jetty threads in Pulsar.

Suggestions for improvement

Pulsar is a little weak in operation and peacekeeping response. We hope Pulsar can be improved in the following aspects:

  • Automatically adjust client parameters, such as send queue size, receive queue size, etc., to make using 10,000 level Topic smoother.
  • In the event of unrecoverable errors (such as ZooKeeper failure), the API is exposed so that we can easily communicate to the alarm platform on the cloud.
  • Track a single topic through logs during operation, and support O&M personnel to use Kibana tools combined with service logs to quickly locate and solve problems.
  • Sampling tracks key points of production, consumption, and storage within Pulsar and exports the data to APM systems such as Skywalking, making it easier to analyze and optimize performance.
  • Consider the number of topics in a load balancing strategy.
  • BookKeeper monitors only data disk usage, not Journal disk usage.

conclusion

From the first touch of Pulsar to the launch of the design, it took us three or four months. Since its launch, Pulsar has been running steadily and performing well, helping us achieve our expected goals. Pulsar greatly simplifies the overall architecture of Huawei cloud iot platform data access services and supports our new business smoothly and with low latency, so we can focus on improving business competitiveness. Due to the excellent performance of Pulsar, we have also applied Pulsar to data analysis services, and hope to use Pulsar Functions in our business to further enhance our product competitiveness.

Author’s brief introduction

Zhang Jian: Huawei Cloud IoT senior engineer. Focus on cloud native, IoT, messaging middleware, APM.

reading

  • Best Practices | Apache Pulsar technology practices in Lakala

  • Application of Apache Pulsar on ActorCloud, an EMQ iot platform product

  • Apache Pulsar’s landing practice in the field of energy Internet