About the Apache Pulsar

Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability. GitHub address: github.com/apache/puls…

This article is originally posted on InfoQ. Link to the original article: www.infoq.cn/article/m5N…

Jinshan cloud log service introduction

Founded in 2012, Jinshan Cloud is one of the top three Internet cloud service providers in China and was listed on NASDAQ in May 2020. Its business scope covers many countries and regions around the world. Since its establishment 8 years ago, Jinshan Cloud has always adhered to the customer-centered service concept, providing safe, reliable, stable and high-quality cloud computing services.

Jinshan Cloud has set up green and energy-saving data centers and operation offices in Beijing, Shanghai, Guangzhou, Hangzhou, Yangzhou, Tianjin and other domestic regions, as well as in the United States, Russia, Singapore and other international regions. In the future, Jinshan Cloud will continue to base itself on the local market and take an international view. By building a global cloud computing network, jinshan Cloud will connect more devices and people, so that the value of cloud computing will benefit the whole world.

Kingsoft cloud group log service is based on log data processing one-stop service system, to provide from the collection, storage of logs to the log log analysis, real time consumption, log delivery services, to support the multiple lines of business log query and monitoring business, promote kingsoft cloud group operations, operating efficiency for each product line, now every data scale in 200 TB.

Functions and features of the log service

As a one-stop service system for log data processing, Kingsoft cloud log service needs to have the following features:

  • Data collection: Customized development based on Logstash and Filebeat supports more data collection modes.
  • Data Query: Supports SQL and ElasticSearch Query String syntax.
  • Data consumption: Encapsulate external sockets based on Pulsar. Some product lines (want to show the log rolling scene in the console) can be realized through the webSocket protocol of the whole log service. You can also query the entire log data (that is, use it as a queue) through the exposed REST API.
  • Exception alarm: After data is retrieved on the console, data and search syntax are saved as alarm items. You can configure alarm policies and alarm modes. After an exception is detected, the background starts corresponding tasks to generate real-time alarms.
  • Chart display: Save the statements and query results retrieved in the console as charts (bar charts, line charts, etc.). When entering the console again, click the dashboard to see all the query statements and result data that have been saved at present or before.
  • Data heterogeneity: You can customize whether to send logs to other cloud product lines. For example, you can send the data of certain logs to the object store for other operations (such as sending data to the Hive data store for analysis).

Why Pulsar

During the survey, RocketMQ, Kafka, and Pulsar were compared in terms of basic functionality and reliability, and their strengths and weaknesses were summarized (see table below for comparison results).

We found Pulsar to be a good fit for log flow processing. At the BookKeeper level, Pulsar is the log stream storage component. Pulsar adopts cloud native architecture, and log stream service also adopts cloud native and no service mode. All services are implemented on the cloud. Pulsar has many flexible enterprise-level features, such as multi-tenant support, tenant storage quota support, data ETL, and overall data load balancing policies. Support the transmission of large amounts of data; The monitoring of message queue is more perfect. Let me elaborate on some of the features we chose for Pulsar.

Computing is separated from storage

The producer and consumer of Pulsar are connected to the broker, which acts as a stateless service and can scale horizontally without affecting the overall production and consumption of data. The broker does not store data; the data is stored in the next layer of the broker (the Bookie), separating computation from storage.

Elastic horizontal expansion and contraction capacity

For cloud products, Pulsar can scale the broker without rebalancing. In contrast, Kafka needs to be rebalanced before scaling up its capacity, which may result in high cluster load and impact on the overall service. Secondly, Pulsar Topic partitions can also achieve unlimited capacity expansion. After capacity expansion, the entire sharding and traffic can be automatically balanced through the load balancing strategy.

Pulsar multi-tenant

Pulsar native supports multi-tenancy. There is also the concept of a tenant in the logging service, where each product line (that is, each project) belongs to a tenant, enabling data isolation between product lines. Pulsar clusters support millions of topics (already practiced at Yahoo), and the entire topic is isolated by tenant. At the tenant level, Pulsar implements excellent features such as storage quotas, message expiration deletion, isolation policies, and supports separate authentication and authorization mechanisms.

Load Balancing Policy

Pulsar has the concept of a bundle at the namespace level. If the broker is currently loaded, the bundle will split through the management Topic partitioning policy, automatically balancing the sub-partitions to other less-loaded brokers. When creating a topic, Pulsar also automatically prioritises the topic to the broker with the current low load.

Pulsar IO model

During a write operation, the broker concurrently writes data to BookKeeper. When Bookie reports a successful write to the broker, only one queue is maintained internally at the broker level. If the current consumption pattern is real-time consumption, the data can be fetched directly from the broker without a Bookie query, increasing message throughput. In a catch-up read scenario, you need to query bookie to query historical data. The catch-up read also supports data uninstallation, that is, data is uninstalled to other storage media (such as object storage or HDFS) for cold storage of historical data.

Topic creation, production, and consumption

After a topic is created in the console, the topic information and tenant information are recorded to etCD and MySQL. Then, the two services on the right of the diagram listen to the ETCD. One is the Producer service, which listens for internal operations after the creation or deletion of a topic. The other type is consumer service. After listening to the creation of a new topic, the corresponding service will connect to the Pulsar topic and consume the data on the topic in real time. The producer then receives the data and decides which topic to write to, while the consumer consumes the data and either writes it or saves it and writes it to another ES or other store.

Topic logical abstraction

There are three levels in Pulsar: Topic, namespace, and tenant. Since Pulsar does not currently support namespase-level regular consumption patterns, we need to take the whole concept up a notch and reduce the amount of work done by background Flink to achieve the whole project level consumption. That is, in the logging service, topics correspond to Pulsar logical fragments, and namespaces correspond to Pulsar logical topics. With this change, we were able to dynamically increase and decrease the number of shards, and Flink tasks launched in the background could consume data at a single project level.

Message subscription model

Pulsar provides four message subscription models:

  1. Exclusive mode: When multiple consumers subscribe to a Topic using the same subscription name, only one consumer can consume data.
  2. Failover mode: When multiple consumers subscribe to Pulsar topic under the same subscription name, if one consumer fails or is disconnected, Pulsar automatically switches to the next consumer for a single point of consumption.
  3. Shared: A widely used model. If multiple consumers are started, but only one subscriber subscribing to topic information, Pulsar will send data to consumers in turn through polling. If one consumer is down or disconnected, the message is polled to the next consumer. The LogHub uses a shared subscription model. The entire Hub runs in a container and can be dynamically scaled up and down depending on the overall load or other metrics.
  4. Key_Shared message subscription pattern: Data consumption is consistent through Key hashing.

Broker Fault recovery

Since brokers are stateless, the failure of one broker has no impact on overall production and consumption, and a new broker takes the role of owner, gets topic metadata from ZooKeeper, and automatically evolves into the new owner. The data storage layer does not change. In addition, there is no need to copy data within a topic, avoiding data redundancy.

Bookie recovered from failure

The Bookie layer uses sharding to store information. Because bookie itself has a multi-copy mechanism, when a bookie fails, the system will read the corresponding fragment information from other bookie and rebalance it, so the writing of the entire Bookie will not be affected, ensuring the availability of the whole topic.

Application of Pulsar in log service

At the bottom of the log service system is the data collection tool, which is customized based on open source data collection tools (such as Logstash, Flume and Beats). A log pool in a data store is a logical concept that corresponds to a Topic in Pulsar. The upper layer of the log service system is query analysis and business application. Query analysis refers to retrieval and analysis in the log service workbench or query through SQL syntax. Service applications include customizing dashboards and charts on the console to realize real-time alarms. Query analysis and business applications both support data migration, that is, log data is transferred to storage media or inexpensive storage devices, such as KS3-based object storage, ElasticSearch cluster, or Kafka. The following figure shows the functions of the log service.

Log service architecture design

We designed the hierarchical architecture of the log service according to the product functions of the log service (as shown in the following figure). The lowest level is the data collection end, which is responsible for collecting log text data, TP/TCP protocol data, log data in MySQL, etc. The development work of the collection end that we developed is still in progress. The collected data is sent to the corresponding Pulsar topic through the data entry of the logging service. We applied Pulsar cluster to three sections. First, we realized the multi-dimensional statistical analysis scenario through flink-on-Pulsar framework, because some business lines needed to do multi-dimensional aggregation statistics through log data to generate index result data, and then stored to the business lines. The second is to apply Pulsar cluster to LogHub (micro-service service), mainly consume data of Pulsar topic, write data directly to ES, and query data of the whole log stream through the console, and also perform retrieval analysis. Third, use Pulsar Functions on the console to set some operators or ETL logic, and do data ETL in the background through Pulsar Functions module. We use EalsticSearch cluster to store data retrieval analysis results, and KS3, KMR, and KES correspond to some of our internal cloud product lines for storage and computing. The data output part of the upper layer can be divided into two modules. One is the Search API module, which is responsible for providing API externally and performing some actions closely coupled with logs on the console by calling API. The second is the Control API module, which supports management operations on the console, such as creating topic, adjusting the number of topic partitions, and retrieving alarms.

Communication design of log service

In terms of the product architecture of logging service, the whole service adopts stateless operation mode, so all kinds of services (especially producer and consumer services) realize data sharing through ETCD. That is, after the console performs create, update, or delete operations, producers and consumers can sense those actions and act accordingly. Furthermore, because producer and consumer are running entirely in containers and the services themselves are stateless, they can be dynamically scaled up and down. The communication design diagram of the log service is as follows.

Log flow processing architecture

According to the requirements of log flow processing, we designed the following architecture diagram. On the left is the data collection end, which sends the data to the data receiving end (PutDatas), and the receiving end sends the data to the corresponding Pulsar topic. We apply Pulsar to three scenarios.

  1. Flink program is added on Pulsar to realize customized ETL multidimensional analysis, statistics, aggregation and other operations.
  2. Use Pulsar to consume and store data in LogHub. After consuming data from Pulsar, write the collected log data to the ElasticSearch cluster.
  3. Use Pulsar on WebSocket and REST apis. WebSocket enables you to view real-time scrolling logs at the console, and the REST API supports querying data in specific queues. We also implemented some simple ETL processing using Pulsar Functions to dump the processed data into a line of business storage medium (such as a data warehouse, Kafka or ElasticSearch cluster).

The future planning

Supported by Pulsar, Kingsoft cloud log service has been running well. We expect the logging service to support more features and fulfill more requirements. In terms of log service, our plan is as follows:

  • Added sequential spending power (which may be required in billing and audit scenarios).
  • Merge and split partitions.
  • Implement full containerized deployment. At present, the internal service of the log service has completed the containerization operation. Next, we will realize the containerization deployment of all Pulsar clusters.

At present, the log service supports about 15 internal product lines of Jinshan Cloud (as shown below). Data transmission on a single line is about 200 TB/ day, and the number of topics has exceeded 30,000. When AI services are connected to Pulsar, the overall data volume and the number of topics will be greatly increased.

During the testing and use of Pulsar, we have gained a more comprehensive understanding of Pulsar and expect Pulsar to support more features, such as removing the dependency on ZooKeeper. At present, ZooKeeper maintains all the metadata of Pulsar, which is under great pressure. And the number of Pulsar topics is forced to depend on the ZooKeeper cluster. If you store the Pulsar metadata information in a Bookie, you can increase the number of topics indefinitely.

Automatic capacity expansion and shrinkage partition. Log data has peaks and valleys. During the peak, the number of partitions in the current topic is automatically expanded to improve the overall concurrency. In low times, the number of partitions is reduced to reduce the pressure on cluster resources.

Provides namespace-level re matching. In background Flink tasks, namespace-level data is no longer monitored, reducing the number of background Flink tasks.

“Language

Apache Pulsar has its unique advantages as the next generation cloud native distributed message flow platform and is well suited for our log flow processing scenario. The Pulsar community is very active and responsive. During our initial research, subsequent testing and launch, StreamNative partners gave us great support and helped us quickly launch the business.

At present, there are more than 30,000 Pulsar topics in Kingsoft cloud log service, which processes about 200 TB of data every day and supports 15 product lines. Since its launch, Pulsar has operated stably, greatly saving our development and operation costs. We look forward to the containerized deployment of Pulsar clusters as soon as possible, as well as the removal of Pulsar’s dependence on ZooKeeper to support automatic scaling and scaling partitions. We’d like to work with the Pulsar community to develop new features that will further accelerate Pulsar’s growth.

Author’s brief introduction

Liu Bin is senior big data development engineer at Jinshan Cloud.

reading

  • Best Practices | Forgoing Ceph, Salesforce uses Apache BookKeeper for strongest storage in the cloud
  • Best Practices | Apache Pulsar technology practices in Lakala
  • Best Practices | Apache Pulsar on huawei cloud iot tour

Click on the link to get Apache Pulsar core dry goods information!