Brief introduction: In this presentation, StreamNative co-founder Jia Zhai introduced Apache Pulsar, the next generation of cloud-native message flow platform, and explained how Apache Pulsar’s native storage and computation separation architecture provides the foundation for batch stream convergence. And how Apache Pulsar combined with Flink, to achieve batch flow integrated calculation.
Apache Pulsar is relatively new, having joined the Apache Software Foundation in 2017 and only graduated from the Apache Software Foundation as a top project in 2018. Pulsar has attracted more and more attention from developers due to its native architecture of storage and computation separation and BookKeeper, a storage engine designed specifically for messages and streams. Today’s sharing is divided into three parts:
- Apache Pulsar is what;
- Pulsar data view;
- Pulsar merges with Flink’s batch stream.
What is Apache Pulsar
The following is a list of open source tools that belong to the messaging space, which will be familiar to any messaging or infrastructure developer. Although Pulsar began development in 2012 and wasn’t open source until 2016, it had been running on Yahoo’s network for a long time before it was introduced to the public. This is why it has attracted the attention of many developers since it was opened source, and it is already an online proven system.
Pulsar is fundamentally different from other messaging systems in two ways:
- On the one hand, Pulsar adopts the cloud native architecture of storage and computing separation.
- Pulsar, on the other hand, has a storage engine designed specifically for messages, Apache BookKeeper.
architecture
The following diagram shows the architecture of the Pulsar storage computation separation:
- First, the Pulsar Broker does not store any state data and does not do any data storage at the compute layer, also known as the service layer.
- Second, Pulsar has a storage engine, BookKeeper, designed specifically for messages and flows, which we also call the data layer.
This layered architecture is convenient for user cluster scaling:
- If you want to support more producers and consumers, you can extend the stateless Broker layer above.
- If you want to do more data storage, you can expand the underlying storage layer separately.
This cloud-native architecture has two main features:
- The first is the separation of storage computing;
- Another feature is that each layer is a node – to – peer architecture.
In terms of node peer, the Broker layer does not store data, so node peer is easy to implement. However, the underlying storage of Pulsar is also a node peer state: in the storage layer, BookKeeper does not use master/slave synchronization, but Quorum.
If multiple data backups are maintained, users can write to three storage nodes simultaneously through a broker. Each piece of data is in a peer state, so that the underlying node is also in a peer state. It is easy for users to scale and manage the underlying node. Such a peer-to-peer basis will bring great cloud native convenience to users, facilitate users to expand capacity in each layer separately, and also improve the availability and maintainability of users’ online systems.
At the same time, this layered architecture laid the foundation for our batch stream fusion at Flink. Because it is divided into two layers, it can provide two different sets of APIs according to the user’s usage scenarios and different access modes of batch stream.
- For real-time data access, this can be done through the Consumer interface provided by the upper level Broker.
- For historical data access, you can skip the Broker and use the Reader interface of the storage layer to access the underlying storage layer directly.
Store the BookKeeper
Another advantage of Pulsar is that it has Apache BookKeeper, a storage engine designed specifically for streams and messages. It is a simple write-ahead log abstraction. The Log abstraction is similar to the stream abstraction in that all data is appended directly from the tail in a continuous stream.
The advantage it brings to users is that the write mode is relatively simple, which can lead to relatively high throughput. In terms of consistency, BookKeeper combines Paxos and ZooKeeper Zab protocols. What BookKeeper exposes to you is a log abstraction. You can simply assume that it is very consistent and can implement a RAFT like log layer storage. BookKeeper was born to serve HA in our HDFS naming node, a scenario in which consistency is particularly critical. This is why Pulsar and BookKeeper are used for storage in many critical scenarios.
The design of BookKeeper has special read and write isolation. The simple understanding is that reads and writes occur on different disks. Such are the benefits of the batch flow integration scenarios can reduce the interference and historical data is read, a lot of times a user to read the latest real-time data, inevitably read historical data, if there is a dedicated for historical data and a separate disk, historical data and real-time data read and write there won’t be for IO, It will bring a better experience to batch stream integrated IO services.
Application scenarios
Pulsar scenes are widely used. The following are some common application scenarios of Pulsar:
- First, because Pulsar has Bookkeeper, the data consistency is particularly high, and Pulsar can be used in the scene of high demand for data service quality, consistency and availability, such as the billing platform, payment platform and transaction system.
- The second application scenario is Worker Queue/Push Notifications/Task Queue, which is designed to decouple systems from each other.
- The third scenario is related to Pulsar’s support for both message and queue scenarios. Pulsar supports the Queue consumption model, as well as Kafka’s high-bandwidth consumption model. I’ll focus on the benefits of Queue’s consumption model combined with Flink later.
- The fourth scenario is IoT application, because Pulsar has MQTT protocol parsing on the server side, as well as lightweight computational Pulsar Functions.
- The fifth aspect is Unified Data Processing, which uses Pulsar as the basis for a batchstream fusion storage.
We invited more than 40 lecturers to share their Pulsar landing cases at the Pulsar Summit Asia in late November 2020. If you’re interested in the Pulsar scenario, follow StreamNative on site B and watch the video.
2. Data view of Pulsar
In these application scenarios, Unified Data Processing is particularly important. Regarding batch stream fusion, the first reaction of many domestic users is to choose Flink. What are the advantages of Pulsar combined with Flink? Why do users choose Pulsar and Flink for batch stream fusion?
First, let’s expand from the data view of Pulsar. Like other messaging systems, Pulsar is message-centric and topic-centric. All the data are handed over by the producer to the topic, and then the consumer subscribes to the consumption news from the topic.
Partition Partition
To facilitate scaling, Pulsar also has the concept of partitioning within the topic, similar to many messaging systems. As mentioned above, Pulsar is a layered architecture that uses partitioning to expose topics to users, but internally, each partition can actually be sliced into a shard based on the time or size specified by the user. A Topic is initially created with only one active shard, and a new shard is sliced when the time specified by the user is up. During the process of opening a new shard, the storage layer can select the node with the most capacity to store the new shard according to the capacity of each node.
The advantage of this is that each shard of the topic will be evenly distributed on each node of the storage layer, and the data storage will be balanced. If the user wishes, the entire storage cluster can be used to store partitions, no longer limited by the capacity of a single node. As shown in the figure below, the Topic has four partitions, and each partition is split into multiple shards. Users can split a shard by time (say 10 minutes or 1 hour) or by size (say 1G or 2G). The sharding itself is sequential, incrementing by ID, and all messages within the shard are monotonically incrementing by ID, which makes it easy to guarantee sequentiality.
Stream flow storage
Let’s look at the concept of data processing in a common stream again from a single shard. All of the user’s data is constantly appended from the end of the stream. Similar to the concept of streams, new data for topics in Pulsar is constantly added to the end of the Topic. In contrast, the Topic abstraction in Pulsar provides some advantages:
- First, it uses an architecture that separates storage from computing. In the computing layer, it is more of a message service layer, which can quickly return the latest data to users through the Consumer interface, so that users can obtain the latest data in real time.
- Another advantage is that it is divided into multiple shards. If the user specifies the time, the corresponding shard can be found from the metadata. The user can bypass the real-time stream and read the shard of the storage layer directly.
- Another advantage is that Pulsar can provide unlimited streaming storage.
For those of you who are infrastructure people, if you see a time-shard architecture, it’s easy to think of moving the old shards into secondary storage, which is what we did in Pulsar. Users can set up to automatically move old or out-of-date or out-of-size data to secondary storage based on the consumption popularity of the topic. Users can choose to use Google, Microsoft Azure or AWS to store old shards, and HDFS storage is also supported.
The advantage of this is that the latest data can be returned quickly through BookKeeper, and the old cold data can be stored in an unlimited stream using the network storage cloud resources. This is why Pulsar can support unlimited stream storage and is a basis for batch stream fusion.
In general, Pulsar provides two different access interfaces for real-time and historical data by separating storage and computation. Users can choose which interface to use to access data according to different internal sharding locations and metadata. At the same time, the old shards can be put into secondary storage according to the sharding mechanism, which can support unlimited stream storage.
Pulsar’s continuum is now on the shard metadata management aspect. Each shard can be stored in different storage media or formats according to time, but Pulsar provides a logical concept of partition by managing the metadata of each shard. When accessing a shard in a partition, I can get its metadata, know its order in the partition, data storage location and storage type, and Pulsar manages the metadata of each shard, providing a unified topic abstraction.
Batch stream fusion of Pulsar and Flink
Streams are a fundamental concept in Flink, and Pulsar can be used as a carrier for streaming to store data. If the user does a batch calculation, it can be considered a bounded stream. For Pulsar, this is a Topic bounded by sharding.
As we can see from the figure, Topic has many shards. If the starting and ending time is determined, the user can determine the range of shards to read based on this time. For real-time data, the corresponding is a continuous query or access. In the case of Pulsar, it means constantly consuming the tail data of the Topic. In this way, the Topic model of Pulsar can be well integrated with the concept of Flink streams, and Pulsar can be used as a carrier for Flink stream computation.
- The bounded computation can be regarded as a bounded stream, which corresponds to some limited sharding of Pulsar.
- Real-time computing is an unbounded stream that queries and accesses the latest data in a Topic.
Pulsar adopts different response modes for bounded and unbounded streams:
-
The first is the response to historical data. As shown in the figure below, the lower left corner is the user’s query, with the range of a time-limited stream given the start and end. The response to Pulsar is divided into several steps:
- The first step is to find the Topic. According to the metadata we uniformly manage, we can get the list of all the shard metadata in this Topic.
- The second step is to limit the metadata in the list according to the time, obtain the starting and ending shards through two-point search, and select the shards that need to be scanned.
- Thirdly, after finding these shards, access the shards that need to be accessed through the interface of the underlying storage layer to complete a search of historical data.
- For real-time data search, Pulsar also provides the same interface as Kafka. You can read the most extreme shard (i.e., the latest data) through the Consumer interface, and access the data in real time through the Consumer interface. It constantly looks up the latest data, and when it’s done, it looks up again. In this case, using the Pulsar Pub/Sub interface is the most direct and effective way.
In short, Flink provides a unified view that allows users to work with streaming and historical data using a unified API. Where once a data scientist might have written two sets of applications, one for real-time data and one for historical data, now a single set of models can solve this problem.
Pulsar mainly provides a data carrier, which provides the storage carrier of the stream for the upper computing layer through a partition-based sharding architecture. Because Pulsar uses a tiered, sharded architecture, it has the latest data access interface for streams, as well as a storage layer access interface for batches that requires more concurrency. At the same time, it provides unlimited streaming storage and a unified consumption model.
IV. Current Capabilities and Progress of PULSAR
Finally, let’s talk a little bit about Pulsar’s current capabilities and recent developments.
Existing capabilities
schema
Schema is a particularly important abstraction in big data. The same is true in the field of message. In Pulsar, if the producer and consumer can sign a set of agreements through a schema, there is no need for the users of the producer and consumer to communicate the format of sending and receiving data offline. We need the same support in the computing engine.
In the PULSAR-FLINK Connector, we use the interface of FLINK Schema to connect with the schema in PULSAR, and FLINK can directly analyze the schema stored in PULSAR data. There are two types of schema:
- The first is the common metatData for each message, which includes information about the key of the message, the time when the message was generated, or other metadata.
- The other is the description of the data structure of the content of the message. The common format is Avro format. The user can know the corresponding data structure of each message through the Schema when accessing it.
At the same time, we combined FLIP-107, integrated Flink Metadata Schema and Avro’s Metadata, and could combine the two schemas together to do more complex queries.
source
With this schema, users can easily use it as a source because it can understand each message from the information in the schema.
Pulsar Sink
We can also send the result of the calculation in Flink back to Pulsar as Sink.
Streaming Tables
With Sink and Source support, we can expose the Flink Table directly to the user. Users can simply use Pulsar as a table in Flink to search for data.
write to straming tables
The following figure shows how to write calculated results or data to a Topic in Pulsar.
Pulsar Catalog
Pulsar comes with many features of enterprise flows. Pulsar’s topic (e.g. Persistent ://tenant_name/namespace_name/topic_name) is not a tiled concept, but has many levels. There is a Tenant level, and there is a Namespace level. This can be easily combined with the concept of Catalog commonly used by Flink.
As shown in the figure below, a Pulsar Catalog is defined, Database is TN/NS, which is a path representation that begins with a tenant, then a namespace, and finally a topic. In this way, the namespace of Pulsar can be regarded as the Catalog of Flink. There are many topics under namespace, and each topic can be the table of Catalog. This easily corresponds well to the Flink Cataglog. In the figure below, the top is the definition of the Catalog, and the bottom shows how to use it. However, there is still some work to be done here, and there are plans to do partition support later.
FLIP-27
The FLIP-27 is a representative of the Pulsar – Flink Batch Stream Fusion. We saw earlier that Pulsar provides a unified view that manages metadata for all topics. In this view, the information of each shard is marked according to metadata, and the purpose of batch stream fusion is achieved by relying on Flip-27 framework. There are two concepts in the Flip-27: Splitter and Reader.
The way it works is that a splitter splits the data source and then passes it to a reader to read. For Pulsar, splitter still deals with a topic of Pulsar. After capturing the metadata of the Pulsar topic, we can determine where the shard is stored according to the metadata of each shard, and then select the most appropriate reader for access. Pulsar provides a unified storage layer, and Flink selects different readers to read the data in Pulsar based on the different locations and formats of the splitter information for each partition.
The Source high concurrency
Another closely related aspect of Pulsar’s consumption pattern. The problem many Flink users face is how to make Flink perform tasks faster. For example, if a user gives 10 levels of concurrency, it will have 10 concurrent jobs, but if a Kafka topic has only 5 partitions, since each partition can only be consumed by one job, then 5 Flink jobs will be free. If you want to speed up the concurrency of consumption, you can only coordinate with the business side to open a few more partitions. In this case, from the consumption end to the production end and behind the operation and maintenance side will feel particularly complicated. And it’s hard to update on demand in real time.
Pulsar supports not only Kafka, where each partition can be consumed by only one active consumer, but also the key-shared model, where multiple consumers can consume a partition together. We also ensure that each key message is sent to only one consumer, which ensures concurrency among consumers and order of messages at the same time.
For the previous scenario, we supported key-shared consumption pattern in Pulsar Flink. Again, 5 partitions, 10 concurrent Flink jobs. But I can break down the range of keys into 10. Each Flink subtask is consumed in one of 10 key ranges. This decouples the number of partitions from Flink concurrency and provides better data concurrency from the consumer side.
Automatic Reader selection
Another direction is that Pulsar as mentioned above already has a unified storage basis. We can select different readers based on the segment metadata of the user. So far, we have implemented this feature.
The recent work
Recently, we have also been working on the Flink 1.12 integration. The Pulsar Flink project has also been iterating, such as the addition of transaction support in Pulsar 2.7 and the integration of end-to-end Exist-Once into the Pulsar Flink Repo. The other work is how to read the column data of the secondary storage in PARQUET format; And use Pulsar storage layer to do the state storage of Flink.