Author: Chen Hang, team leader of BIGO Big data messaging platform. The layout of this issue: Tango@StreamNative.

About the Apache Pulsar

Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability. GitHub address: github.com/apache/puls…

background

In our last blog, we discussed some of the problems BIGO encountered in the performance tuning process of Pulsar Broker and suggested solutions. In this blog, we will discuss BIGO’s performance tuning work on BookKeeper, the underlying distributed storage service in Pulsar.

For BIGO, Apache Pulsar has the following performance issues on the Bookie side (a single storage node of BookKeeper) :

  1. The read request takes a long time and is heavily queued.
  2. [Fixed] Bookie displays direct Memory Out of Memory (OOM), causing the process to hang.
  3. Broker Direct memory OOM
  4. When the journal disk is HDD, although fsync is turned off, the bookie Add Entry 99th latency is still very high, and the write performance is poor.
  5. When a large number of read requests enter bookie, write backpressure occurs, and add entry latency increases. This is more obvious when the Ledger disk is HDD.

BookKeeper’s stability and high throughput and low latency are the cornerstones of Pulsar’s stability and throughput. This article introduces the factors that affect read/write throughput and stability based on the fundamentals of BookKeeper. We plan to cover bookie performance tuning in the following six areas:

  • Pulsar Topic Message write/read process
  • BookKeeper Request IO Request tuning
  • The Ledger Memtable flush policy was optimized
  • Journal flush policy tuning
  • Tuning Entry read performance
  • GarbageCollector (GC) performance optimization

Environment deployment and monitoring

Before introducing Bookkeeper performance tuning, we need to add detailed monitoring metrics to the system, and understand the meaning and relationships behind each monitoring metric. We have already described the environment deployment and monitoring in detail in Apache Pulsar performance Tuning for BIGO (part 1), so we won’t repeat the details here.

Pulsar Topic Message write/read process

To make the description easier to understand, we will first cover the whole picture of message flow from a Pulsar Topic message write/read perspective, and then take a closer look at the internal implementation of Bookie.

Pulsar uses dbLedgerStorage by default, so this blog chooses the implementation of dbLedgerStorage to explain.

Topic Message writing process

When a client writes an entry (each entry has a unique <legerId, entryId> tuple identifier) to BookKeeper, the process is as follows :(the process of writing journal is omitted for convenience)

  1. The entry is placed in the Netty thread processing queue and waits for the Netty thread to process it.

  2. The Netty thread obtains each entry from the queue in turn, obtains modules according to the ledgerId of the entry, and selects the target disk (Ledger disk) to write to. The modular algorithm is ledgerId % numberOfDirs, where numberOfDirs indicates the number of ledger directories configured by the bookie process.

  3. After selecting the target disk object, the index is written to the cache and rocksDB for persistent storage, and the payload is written to the memtable (which is a dual memory buffer) for sorting and flushing.

  4. When a memtable buffer is full, it triggers Flush. Payload is flushed into the PageCache, and the PageCache is flushed back to the disk.

Topic Message reading process

When a client needs to read an entry (each entry has a unique <ledgerId, entryId> tuple identifier), the process is as follows:

  1. Obtain the metadata of the ledger where the entry resides from ZooKeeper. Metadata stores the address of the Bookie node where the ledger copy is located, as in Ensembles: [Bookie1, bookie2].

  2. Send an entry read request to one of the bookie (a series of fault-tolerant, turn-off strategies that the client executes are omitted for narrative purposes).

  3. After bookie1 receives the Read Entry request, it hashes the ledgerId, selects the corresponding readerThread, and places the request into the request processing queue of that readerThread.

  4. Readerthreads in turn retrieve requests from the request queue, modulo ledgerId, and select the disk on which the Ledger resides.

  5. After selecting the target disk object, check whether the memtable and readAheadCache have cached the target entry. If so, return directly. Otherwise, the rocksDB index is read, which in turn reads the target data on disk and loads the read data into readAheadCache.

This is the main logic for writing/reading message. So what are the details of message processing? When message writing or reading is slow in our production system, how can we quickly locate the problem and optimize it accordingly? This requires a deep understanding of BookKeeper’s IO model, and tuning, if necessary, in conjunction with the Linux IO stack.

BookKeeper Request IO Request tuning

BookKeeper uses Netty to process Request IO, including the Add Entry Request and Read Entry Request processes, and includes the name and meaning of monitoring items for each step. Then, the solutions to the performance problems encountered in the BIGO production environment are presented.

BookKeeper Request THE FOLLOWING monitoring indicators are used in the I/O Request process.

  • Bookkeeper_server_BookieReadThreadPool_queue_ [0..16] : the number of queued requests in the queue;

  • Bookkeeper_server_READ_ENTRY_REQUEST: The delay of the Request from entering the Request queue to finishing processing;

  • Bookkeeper_server_BookieReadThreadPool_total_tasks_0: The number of requests being processed by the read thread (measured by read throughput);

  • Bookie_read_entry: Time taken from the start to the end of the Read request processing.

According to the steps shown in the diagram, the following is a brief overview of the process from the BookKeeper Client initiating the Request to the Bookie processing:

  1. During Bookie Server startup, Netty Server (epoll) is started first. By default, threads are allocated twice as many CPU cores to handle network requests. Each thread has its own Thread channel (default length: 10000), caches incoming requests, and listens on a network port (default: 3181).

  2. When the Netty Server receives a request from the Client, it hashes the request to the corresponding Netty Thread Channel based on the Client sessionId. Wait for the corresponding Netty thread to process.

  3. Each network processing thread takes out the Request to be processed from its own Thread channel and processes the Request as follows: LengthFieldBasedFrameDecoder, LengthFieldPrepender, RequestDecoder, ResponseEncoder, ServerSideHandler, requestHandler. RequestHandle is the actual processing of the request, while the rest are pre-processing such as unpacking.

  4. The BookieRequestProcessor##processRequest method calls different methods according to the request type. ADD_ENTRY, READ_ENTRY, FORCE_LEDGER, AUTH, WRITE_LAC, READ_LAC, GET_BOOKIE_INFO, START_TLS, etc. This is the general entry for Bookie Server to handle different types of requests (I’m only using the most common ADD_ENTRY and READ_ENTRY here for analysis).

  5. ADD_ENTRY

    • Pass request and Channel to WriteEntryProcessorV3 to generate the corresponding instance and invoke the Run method to start it.

    • The run method first calls the addEntry method to write the entry to the bookie and returns the processing result.

    • Encapsulate the processing results in sendResponse, call writeChannel and write them out to the Netty Thread Channel and send them to the client.

  6. READ_ENTRY

    • Encapsulate the Read Request as an instance of ReadEntryProcessorV3, and use LedgerId to select a Reader thread modulated by the thread pool size. Add the Read Request to the thread’s processing queue (each thread has a separate queue with a default length of 2500). If the queue length exceeds the maximum, it will be blocked).

    • The Reader processing thread keeps pulling the Read Request from its own queue, calling the safeRun method of the ReadEntryProcessorV3 instance to Read the data.

    • The data reading process is to select the corresponding Ledger instance according to LedgerId to read entry data back and insert it into readResponse.

    • The Read Response is returned to the Netty Thread Channel, which then sends it to the client.

In the process of pressure measurement, we found that some ledgers were slow to read. In addition, we find that some reader threads are heavily queued.

Combined with the READ_ENTRY processing model, ledger reading is modulo based on the total number of reader threads. There are two solutions to mitigate this problem:

  • Increase the number of reader threads to reduce processing stress.

  • Add partitions for topic to spread read pressure.

Since all read and write requests before being processed will be added to the corresponding queue queue, control parameters are: maxPendingReadRequestsPerThread and maxPendingAddRequestsPerThread. If the downstream processing is slow, the waiting queue may be full, increasing the risk of Direct Memory OOM. The mitigation options are as follows:

  • Controls the length of the wait queue. For requests that cannot be processed quickly, error is returned.

  • Speed up Journal write and Ledger write/read processing.

The Ledger Memtable flush policy was optimized

Before writing data to Journal, ensure that Memtable is written successfully. The Memtable design is a dual buffer. The default Size of a single buffer is Direct Memory Size * 1/4 * 1/numberOfLedgers * ½.

When we write an Entry to the Memtable, the Entry will be directly written to WriteCache. In this case, there are three situations:

  • If WriteCache still has free space, write entry directly to WriteCache and return success.

  • If WriteCache is flushed, but writecachebeinguploading is empty, it triggers double-buffering rotation, writes entry to empty WriteCache, and starts independent threads to trigger sorting and flushing.

  • If WriteCache is flushed before writecachebeinguploading is complete, entry writing is blocked until Writecachebeinguploading is flushed.

The WriteCache sort flush process is as follows:

  1. Use a quicksort algorithm to sort the data in WriteCache (a specially designed HashMap). After sorting, similar entries of the same Ledger are arranged together for the OS to preread.

  2. Write the sorted index to RockDB.

  3. Flush sorted data into PageCache.

We can configure the parameter flushEntrylogBytes to control how often entry is flushed from PageCache to Disk.

Note that entry in the PageCache is flushed to disk without evict removing the data in the PageCache. The purpose is to provide a cache for entry reads.

When HDD is used as a Ledger Disk, if the amount of data flushed from PageCache to Disk is too large at a time, Disk IO Util will be continuously full, and PageCache flush will slow down. The speed of WriteCache flush to PageCache slows, causing entry writes to be blocked.

Normally, data in WriteCache is sorted and then flushed to PageCache. The PageCache flush and WriteCache flush are asynchronously decoupled. A slow PageCache flush should not affect the WriteCache flush speed. The problem is that all WriteCache flush data ends up in the entry log file. When an Entry log file is rolled, the entry log file is closed only after all PageCache data is flushed to the disk and a new entry log file is created to receive new data. Therefore, if the PageCache backflush becomes slow, it will eventually affect the speed at which WriteCache flushes to PageCache.

In the Catchup Read scenario, the Ledger disk needs to provide entry reads, resulting in random reads at macro level and sequential reads at micro level (sorting effect). At this point, both data is written and data is read. For disks, this is a mixed read/write scenario. In order to provide read/write throughput, we need to find ways to reduce the frequency of disk reads. This part of the tuning needs to start from the Linux IO stack.

Journal flush policy tuning

The ADD_ENTRY operation finally performs an entry write by calling the Bookie#addEntry method. When Entry is written, Memtable will be written first, and Journal will be written after successful writing. This process is serial.

Assuming we have multiple journal directories configured, which journal do we choose to write to? We adopt ledgerId % numberOfJournalDirs algorithm to select journal instance. The Add Request (including data payload) is placed in the queue to be processed by a dedicated Journal thread. Users can configure bookkeeper. Conf numJournalCallbackThreads parameters, control the journal thread number of threads, the default is 8.

After a write request is placed in the journal queue, the process is as follows:

  1. The Journal instance has a background thread running independently that continually pulls QueueEntry out of the queue for the following processing (if there is no QueueEntry in the queue, it blocks).

  2. Strip entry data out of QueueEntry and write to BufferedChannel (each journal file has a BufferedChannel). When the previous journal file scrolls, the next journal file needs to be created and associated with the new BufferedChannel.

  3. Put the Request Meta that has stripped entry Data into the toFlush Qeueue. The toFlush queue will flush if any of the following conditions are met:

    • The entry wait time in the toFlush queue exceeded the threshold. Procedure
    • The toFlush queue size exceeded the buffWrite or bufferedEntries threshold.
  4. Flush data to PageCache.

    • Flush data from BufferedChannel into OS PageCache.

    • If any of the following conditions are met, a forceWriteRequest request is generated and added to the forceWriteRequest queue (BookKeeper 4.9.2, Flush if any of the first two conditions are met; BookKeeper 4.10 is flush every time; The current master branch is flushed if it meets any of the following conditions:

      • Open journalSyncData.

      • The Journal file reaches its maximum size and needs to be scrolled.

      • The maximum interval since the last Flush PageCache has been reached (1s by default).

  5. Flush the PageCache data to Disk. The forceWriteRequests queue has a separate background thread that continuously fetches forceWriteRequest from the queue and flushes PageCache data to Disk. The ADD_ENTRY ACK timing issue is involved here. The main difference is whether JournalSyncData is enabled or not (Pulsar is enabled by default, i.e. ACK is returned after each flush).

    • If JournalSyncData is enabled, ACK is returned only after Flush Disk (Step 5).

    • If JournalSyncData is turned off, a callback is executed after Flush PageCache (Step 4), returning an ACK.

To more clearly identify bottlenecks during journal writing, we need to understand the meaning of each monitoring item and configure the Grafana monitoring metrics to quickly locate problems. The following figure shows the length monitoring of each queue and the processing time of each stage.

Note that:

  • BufferedChannel is directly associated with the Journal TranSection file. If no new journal file is created, journal Add Request processing is blocked. If the IO util of the journal disk continues to be full, the creation of the new journal file will be blocked (IO wait time is long). The journal writing time goes up. Note The following counters: bookie_journal_JOURNAL_CREATION_LATENCY.

  • When the Journal disk is HDD, journalSyncData switch is turned off. When data is written to PagaCache, ACK is returned to reduce entry write latency. However, after journalSyncData is closed, we find that add Entry 99th latency is very unstable, sometimes to a few seconds or even more than ten seconds. This is caused by disk I/O jitter caused by PageCache backflush. There are three times when data in PageCache is triggered to flush back to disk:

    • The OS brushes back every 30 seconds.

    • PageCache Dirty Page Exceeds the threshold, triggering a flush.

    • Roll File triggers a brush back.

What these three strategies have in common is a large amount of data. Flush a large amount of data at a time. As a result, HDDS are continuously filled with IO Util in a short period of time. Other I/O requests sent by the kernel are queued in the scheduling queue, including the new Journal File Create request. To that end, we’re at PR 2287(github.com/apache/book…) To control the amount of data flushed from PageCache to disk at a time to control the time when disk IO util is full. According to BIGO pressure measurement practice, after enabling this policy, HDDS achieve similar performance to SATA SSDS.

Since all Journal write requests are first placed in the Journal Queue, if the downstream processing speed slows down, a large number of Add Requests may accumulate in the Journal Queue and consume a large amount of memory. Increases the risk of Direct Memory OOM. The mitigation options are as follows:

  • Use the journalQueueSize parameter to control the size of the Journal Queue, which defaults to 10000.

  • Speed up Journal thread data writing processing speed.

Tuning Entry read performance

The Entry reading process is shown in the following figure. The input parameter is (ledgerId, entryId).

  1. According to ledgerId According to ledgerId % numberOfLedgers, select the target Ledger disk instance where the entry resides.

  2. Check whether entry that you want to read exists in the MemTable (writeCache and WriteCacheBeingFlushed) corresponding to this Ledger instance (as <ledgerId, entryId> is the Key). Time complexity is O(1)). If cache hit, entry is returned.

  3. Check whether there is an entry that you want to Read in the Read Cache (<ledgerId, entryId> is the Key, queried from the index, time complexity is O(1)). If the Read Cache hits, entry is returned.

  4. If the Read Cache also fails to hit, data is Read from main memory, prefetch is started (prefetch is Read one entry at a time and Read multiple times) and all entries are added to the Read Cache for caching. The entire prefetch process is serial, and the number of prefetches is controlled by dbStorage_readAheadCacheBatchSize. The larger this parameter is set, the higher the long tail delay for Read Cache misses.

The prefetch process is as follows:

  1. Get the instance of Ledger File corresponding to LedgerId, and check whether there is an entry to read in BufferedLogChannel.

    • If so, entry is returned directly.

    • If not, start RandomAccessFile with internalNioBuffer and read a batch of data from the file into the readBuffer. The Batch size is controlled by the readBufferSizeBytes parameter.

  2. Cache the preread entry in the BufferedLogChannel Cache and return the result.

During the prefetch process, if hot data is still in PageCache, the data is directly read from PageCache; otherwise, the data is read from Disk. When data is read from the Disk, the OS has the prefetch function and caches the read data to the PageCache. Since Bookie does not rely entirely on PageCache for cache hits, PageCache pollution from Catch Up reads has less of an overall impact.

When reading an entry, you need to read it based on the offset (index) of the destination entry in the entry log file. In order to speed up index reading, Bookie stores the index in RocksDB. We need to ensure the hit ratio of the index to the query in RocksDB, so the RocksDB cache size configuration becomes key. The related parameters are dbStorage_rocksDB_blockCacheSize.

GarbageCollector (GC) performance optimization

When Bookie processes data writes, it sorts all entries written within the same period of time (entries that may belong to multiple ledges) into the same entry log file and stores the indexes in RocksDB. This presents a problem: when some topic data is expired or deleted, all ledgers associated with expired data should be cleaned up. However, since multiple ledgers are written to the same Entry log file over the same time period, the cleanup process becomes slightly more complicated.

BookKeeper’s solution is to start a separate GarbageCollector (GC) thread to do the cleanup. GC cleanup threads are classified as minorCompaction or majorCompaction, and the difference between the two is that the threshold is different. By default, minorCompaction takes one hour to clean, with a threshold of 0.2. This majorCompaction occurs every 24 hours with a threshold of 0.5. The threshold refers to the percentage of valid data in an entry log file. When data in Topic expires, the system marks the metadata of ledger in ZooKeeper as expired. The GC thread periodically scans all the ledgers in each entry log for expiration and counts the percentage of valid data remaining.

GC reads each entry in the entry log file one by one to check whether the entry is expired. If it has expired, it is discarded. Otherwise, it is written to a new entry log file and the index information of the entry in RocksDB is updated.

In the GC process, entries in the entry log file are read sequentially for disks. However, if a large amount of data is written, the performance of mechanical disks deteriorates sharply in the mixed read/write scenario. The phenomenon is that read/write throughput jitter when GC occurs in bookie.

To reduce the impact of GC, BookKeeper provides two speed limiting policies: speed limiting by Entry and speed limiting by bytes. By default, rate limiting is based on entry. That is, the maximum number of entries read per second is controlled by the parameter compactionRate. The default value is 1000. The speed limit can also be set by bytes. That is, the number of bytes read per second is controlled by isThrottleByBytes and compactionRateByBytes. The default value is 1000000.

The amount of data carried by each entry varies. Limiting the rate according to the entry rate may cause jitter of data read from disks, affecting normal data read and write. Therefore, speed limiting in bytes is recommended.

Linux IO stack optimization

For the Linux IO protocol stack, we recommend tuning in the following ways:

  1. HDDS use the CFQ (Completely Fair Queuing Algorithm) scheduling algorithm, and SSDS use the NOOP scheduling algorithm.

  2. Read cache acceleration using OpenCAS based SSDS. In the cache layer, different replacement strategies are adopted to reduce the probability of read request sinking to ledger disk, and ensure the sequential read and write of Ledger disk as much as possible, so as to improve read and write throughput. BIGO is carrying out related pressure measurement work in this part, and the follow-up progress will be synchronized in time.

conclusion

This article introduces the operation mechanism of BookKeeper message processing from a message write and read perspective and details key points that impact performance. In the process of narration, we focus on the monitoring indicators of each link to explain the operation mechanism. When encountering performance or stability problems, you can quickly locate the problem based on monitoring indicators and analyze and handle the problem based on principles.

In the case of mechanical hard drives, how to optimize IO performance is the key to ensure BookKeeper stability and throughput. BookKeeper is designed to write Journal and Ledger in the same order as possible. However, it cannot avoid mixed reads and writes, such as Catch Up, Compaction, and Auto Recovery. Mechanical hard drives can deteriorate dramatically in mixed read and write scenarios. Speed limits can be used to mitigate this problem with Compaction and Auto Recovery. For Catch Up reads, SSD caching can be used to improve read hit ratios and reduce the chance that random I/OS will sink to a mechanical hard drive. The BIGO message queue team continues to work on a series of optimizations at the IO layer, customizing caching strategies to ensure system throughput and stability.

About the author

Hang Chen is the leader of BIGO’s big data messaging platform team, responsible for the creation and development of a centralized publish-subscribe messaging platform for large-scale services and applications. He introduced Apache Pulsar to the BIGO messaging platform and connected with upstream and downstream systems such as Flink, ClickHouse, and other real-time recommendation and analytics systems. He is currently focusing on Pulsar performance tuning, new feature development and Pulsar ecosystem integration.