For the first time, it’s from the movie “One breath, one breath” worth a look! (Study and relax one by one)

This article is quite long, so I plan to update it several times. You can pay attention to it first and enjoy it slowly

Ps: Due to the limited time and knowledge of the author, this paper is a process of continuous update and in-depth study. Above all, I want readers to think about Kafka and ES in relation to each other, thinking about the similarities and differences between the two architectures.

Kafka and ES are not new to Kafka and ElasticSearch (ES). I have already introduced Kafka and ES in several articles. ElasticSearch: Kafka ElasticSearch: Kafka ElasticSearch: Kafka ElasticSearch: Kafka ElasticSearch

A lot of people are wondering why I’m comparing these two things, because they’re not related or they’re not in the same direction, one is a queue, one is storage. It really shouldn’t be compared, but if you look at the title of the article, the principles of design, I’m comparing here in terms of principles, architecture. Both of them are hot “players” in their own fields. I think it is meaningful to compare their design and implementation, which is also helpful for our own design in the aspect of architecture. All right, let’s get down to brass tacks.

An overview of the

Kakfa and ES are both very common components in distributed systems today, and they are excellent in their own right. So first, a quick introduction to the respective features.

  • Kafka

    Kafka is a high-throughput distributed publish-subscribe messaging system that processes all action flow data in consumer-scale websites.

    • Message persistence is provided through the O(1) disk data structure, which can maintain stable performance over long periods of time even with terabytes of message storage.
    • High throughput; Even the very modest hardware Kafka can support millions of messages per second.
    • Support for partitioning messages through Kafka servers and consumer machine clusters.
    • Hadoop supports parallel data loading and Spark and Flink for real-time stream computing.
  • ES

    • Distributed real-time file storage where each field is indexed and searchable
    • Distributed real-time analysis search engine
    • Process petabytes of structured or unstructured data

    This scenario is applicable to the following scenarios: detailed query, filtering, and sorting this scenario is not applicable to the following scenarios: 1. Aggregation of a large amount of data (total number of index documents >=500w) and a high cardinality (buckets >=1000) 2. Large amount of data (total number of indexed documents >= 500W), high cardinality (unique value >=10000) deweighting (deweighting is also inaccurate) 3. High cardinality fuzzy query (index document total >= 500W) 4. Associated query: ES is not good for associated query 5. Multi-dimensional composite aggregation (dimension >=5) : There are serious performance problems at large cardinality 6. 7. Data analysis: not suitable for offline calculation of bucket number Estimation: Assuming that field A and B have 10 different values respectively, group the two fields simultaneously, then 10×10=100 barrels will be generated eventually (The Cartesian product of different values of each field is the final bucket number)

How to choose the master in distributed environment

As a distributed data storage service, Kafka and ES are both highly available and scalable, so there is no doubt that there are multiple nodes. So in order for so many nodes to work together, there must be a single cooperative node that manages the cluster. This cluster node is called a controller in Kafka, and we call it a master in ES.

Kafka broker to choose main

The Zookeeper component is used to manage the metadata of the cluster. Controller information is also one of the metadata. Therefore, kafka controller selection is implemented by ZK. All brokers in the cluster register temporary nodes with zK’s /controller. The first successful broker becomes the controller Brokker and the rest register with watcher. If the controller goes offline or crashes, other brokers in the cluster can sense it instantly and re-register it.

ES master master

The ES configuration file has a property “node.master”, which, if true, qualifies the node as master. Then, the principle of ES primary selection is: Ping requests are made to each other among the qualified master nodes, and then the node with the smallest ID is found in the node to vote. When a node receives more than half of the votes, the node becomes the master node and broadcasts information to all nodes in the cluster.

The above is a brief introduction to Kafka Broker and ES selection. Kafka is very simple because it uses ZK, while ES implements a Bully algorithm by itself. For details, see ES selection main process analysis.

The data model

Kafka data is grouped into partitions by Topic. ES data is grouped into shards by Index/Type. They are very similar from this point of view, and both use master-slave patterns. Both Partiton and Shard have a primary copy and then create multiple secondary copies based on the replica factor.

Master copy election

The two are similar.

Kafka leader election

Kafka uses the priority copy policy to select primary copies of partitions:

When creating a topic, an All Replication (AR) set is automatically created. This AR set does not change before adding a new Partition, including exceptions such as the replica Partition going offline. In addition, there is an ISR (In-Sync-replication) synchronized replica set. This set is the set of replicas that are currently synchronized with the master replica data. This set varies with the environment. Therefore, each time the leader of the primary copy is elected, the copy will be successively taken out from the AR set and judged whether the leader is in the ISR set. If so, the election is successful and the leader information will be synchronized to zK.

ES

ES introduced the concept of Allocation ID from the 5.x version, which is used for primary shard election. Each shard has its own unique Allocation ID, and there is a list in the cluster meta information to record which shards have the latest data. When creating an index, specifying which shard is the master shard has great flexibility, taking into account cluster balancing and other constraints (such as allocation awareness level filters, etc.), and can even be controlled artificially, but there is a principle that the data of the master shard must be kept up to date.

ES also has a set of synchronized replicas, similar to the IN-sync-allocation IDs in Kafka’s ISR. So the primary shard of the election must also be in the ISR.

Synchronizing primary and secondary data

Because of master-slave mode, each slave node is evenly distributed across all the cluster nodes, which inevitably leads to data consistency and availability issues. Kafka and ES have their own implementations of this.

Kafka

Kafka reads and writes data on leader shards. Replica shards are used for redundancy, so there is no data consistency problem for Kafka. However, this does not mean that the message data written by the producer can be immediately read by the consumer. There is an HW problem, which will be explained later.

ES

Like Kafka, ES stores data on shards. Shards also store multiple copies, but ES distributes all write requests to the primary Shard, while read requests are load-balanced across all shards (including the primary Shard) for that Shard. Therefore, there is how to ensure that data is synchronized to all replica shards after being written to the master shard. The COMMITTED List is maintained by the master shard. Kafka HW does this by maintaining the committed list, which records the records committed by all shards. This guarantees that if the master shard goes offline, Other replica shards can also hold all the data for re-service.

How data is stored

At this point, a lot of underlying principles are involved, and there are many different implementations of the two, after all, the design purpose of the two is completely different. A brief analysis will be given below, followed by a separate article to explain both.

Kafka

As mentioned above, Kafka is high throughput, and the way it stores its data is also one of the reasons.


  • The.log log file is used to store specific message data. In addition, the size of each log file is limited. When the specified size is reached, a new log file will be created, and the file name will be named with the offset of the earliest message in the current log. For example, 00000000000000000000.log.
  • Index Offset index file, which corresponds to the log file. The file name is the same as the log file name, but the file extension is.index. Instead of indexing every Message in the data file, the index file uses sparse storage (sparse indexing), which creates an index every certain byte of data. This prevents the index file from taking up too much space and keeps the index file in memory.
  • Timeindex Indicates a timestamp index file that corresponds to a log file. The file name is the same as the log file name, but the file name extension is. Timeindex. Note that this file is not available until version 0.8.

Kafka writes directly to disk files, but sequentially to the end of the file, which is dozens of times better than random reads and writes. In addition to this sequential append, Kafka uses Page caching, zero-copy, and other technologies to further improve write performance.

Just to summarize

Using LogSegment+ ordered offset+ sparse index + binary search + sequential search can quickly find the specified message; Sequential appending + page caching + zero copy improves write performance.

ES

ES is based on Lucene, and Lucene stores data in segments. What data does segments contain, and how does IT support ES search and aggregation? It is essentially data with only two structures (dense indexes) :

  • Inverted index: to view documents from the dimension of words (term), identify which document (docId) each term table appears in, as well as the frequency (word frequency) and position (offset relative to the document head) of each document; term->docId
  • Forward indexing: words from a document’s point of view, indicating which words are in each document, and how often and where each word appears; docId->term

Segments do not change after being written to disk. The benefits of immutability are:

  1. No need to add locks, improved write performance
  2. Read performance is improved. Once the FILE is cached by the OS, the vast majority of read requests will directly access data without disk IO
  3. Other caches (Query Cache, Request cache, fileData cache) are improved

The disadvantage is that when changes occur, the index must be rebuilt to be seen; When a read request comes in, all segments are traversed to filter out the data in the compound condition.

Segments are invariant, so for deletions, there is a. Del file in ES that marks which segment and which record is deleted. For modifications, each record in ES is based on version information. When a document is updated, the old version will be marked for deletion and a new version will be created. Segments are deleted based on the. Del file. Segments are deleted.

Writing process

In Kafka, the write process is how the cluster writes data sent by the producer to a local log file. For ES, the writing process is much more complicated and does not include synchronization of data copies.

Kafka

Messages are written to sequential appending files and are not allowed to modify already written messages. Kafka also writes data to the system’s Page Cache and waits for the system to flush the Cache to disk. Kafka also provides synchronous flush and intermittent fsync.

ES

ES Each node writes data to buffer, translog, refresh, Flush, fsync, and commit point.

  • Buffer: An area of memory managed by the JVM, of limited size
  • Translog: The log file stores each document
  • Refresh: Executes every 1s by default to write the data from the buffer to a new segment (in the file cache), at which point the data can be searched
  • Flush: Writes all the data in the buffer into new segments, flushes the segment from the system file cache to disk, and generates a commit point file at the same time
  • Fysnc: Flush translog logs from the file cache to disk
  • Commit point: A log file is generated when the segment file is flushed to the disk to record information about the segment file

Here is the specific writing process:

  1. Write request coming ok, write to buffer, write to translog file synchronously (this will be written to file cache)
  2. By default, the data in the buffer every 1s or when the size of the buffer is full will be refreshed to a new segment file in the file cache. At the same time, the buffer will be cleared so that the data can be retrieved. This is also the reason for near-real-time search
  3. Flush every 30 minutes, or if the translog size reaches a certain size, flush flush all segment files from the file cache to disk and flush the translog as well. Create a commit point file that records which segments are flushed. Finally, the Translog file is cleared;
  4. In the background of ES, translog is flushed to disk every 5s (configurable). This operation is called fsync. Real data loss can occur if the machine is powered down within 5 seconds
  5. When the number of segment files on the disk increases, merge the data from the. Del file and create a new commit point file.

This is how the ES data is written to the shard and the risk of data loss occurs in step 4.

By comparing ES and Kafka, we can see that both use Page Cache technology extensively, which ensures the safety and availability of JVM memory and high data write performance.

conclusion

Through comparison, we can know the reasons for different designs and find excellent designs. But the final goal is not only to get familiar with “wheels”, but more importantly, to integrate these excellent design concepts into the architectural design of my own projects, so that I can really apply what I have learned.

Through multiple updates, the internal design principles of Kafka and ES have been briefly analyzed, but there are still many details that are missing, such as the details of how Kafka data copies are updated, and the reading process (Kafka consumption, ES search) have not been detailed, of course I will continue to update later.

Hard code farmers are not only code farmers, or knowledge porters!