## First, the basic principle of reading and writing
The process of writing data to Elasticsearch
1) The client selects a node to send the request to, and the node is a coordinating node. 2) The document is routed, 3) The primary shard on the actual node processes the request, and then synchronizes the data to the Replica node 4) Coordinating node, If it is found that the primary node and all replica nodes are repaired, the replica node returns the response result to the client
Procedure for Elasticsearch to read data
1) The client sends the request to any node as coordinate Node 2) Coordinate Node routes the Document and forwards the request to the corresponding node. In this case, round-robin random polling algorithm is used. Select a random one from the primary shard and all replicas to balance the read request load. 3) The node receiving the request sends document to coordinate Node. 4) The Coordinate Node sends document to the client
1. When a document is written, each document will be automatically assigned a globally unique ID (doc ID) and hash route to the corresponding Primary SHard based on the DOC ID. You can also specify doc ids manually, such as order IDS and user ids.
2. While reading the document, you can query through the doc id, then according to the doc id hash, judging from the doc id assigned to which shard to above, from the shard to query
Elasticsearch Searches for data
The most powerful of ES is to perform full-text retrieval 1) The client sends the request to a coordinate node 2) The coordinate node forwards the search request to the corresponding Primary shard or Replica Shard of all shards 3) Query phase: Each SHard will return its search results (actually some doc ids) to the coordinating node, which will perform data merging, sorting, paging and other operations to produce the final results. Then the coordination node pulls the actual Document data from each node according to the doc ID, and finally returns it to the client
The basic principle of search: inverted index
Elasticsearch writes data to data
1) Write buffer first, in which the data cannot be searched; Data is also written to a Translog log file. 2) If the segment is nearly full, refresh the buffer to a new segment file, but the segment file is not directly stored in the disk file, but is stored in the OS cache. This process is refresh. Every 1 second, es writes the data in the buffer to a new segment file. Every second, es generates a new disk file, the Segment file. This segment file stores the data written to the buffer in the last 1 second. If there is no segment file in the buffer, refresh the segment file every second. If there is no segment file in the buffer, refresh the segment file every second. In operating systems, disk files actually have something called OS cache, which means that before writing data to disk files, it goes to the OS cache, a memory cache at the operating system level. As long as the data in the buffer is flushed to the OS cache by refresh operation, it means that the data can be searched.
Why is ES quasi-real time? NRT is near real-time. The default is refresh every second, so ES is quasi-real-time because written data is not seen until one second later.
You can use the ES restful API or Java API to manually perform a refresh operation, that is, manually flush the data in the buffer to the OS cache, so that the data can be immediately searched.
As soon as the data is entered into the OS cache, the buffer is cleared, because the data is persisted to disk in the Translog because there is no need to keep the buffer
Second, performance tuning
System-level tuning
System-level tuning is about setting up memory and avoiding swapping memory. ES is installed with a default heap of 1GB, which is obviously not enough, so the next question arises: How much memory do we need for ES? It really depends on the memory size of our cluster nodes, and whether we want to deploy other services on the server nodes. If the memory is relatively large, such as 64GB or more, and no other services are deployed on the ES cluster, then it is recommended to set the ES memory to 31G-32GB, because there is a 32GB performance bottleneck, which means that even if you give the ES cluster more than 32GB, Performance is not necessarily better, or even worse than 31G-32GB. When setting up ES cluster memory, it is also important to ensure that the minimum heap memory (Xms) is the same size as the maximum heap memory (Xmx) to prevent applications from changing the heap size at runtime, which can be a costly process for the system.
Disabling swap can cause fatal performance problems if memory and disk swaps are allowed. The swap space is a disk space that is used by the operating system to store the page data that is not commonly used by the operating system. In this way, more memory can be allocated for page cache. This generally improves system throughput and IO performance, but also causes problems. Frequent page switching causes I/O reads and writes and interrupts the operating system, which affects system performance. The larger this value is, the more actively the operating system uses swap space. Bootstrap. memory_lock: true in elasticSearch.yml to keep the JVM locked and ES performance guaranteed.
Sharding and copy
Shard: ES is a distributed search engine. The index is usually divided into different parts. The part of data distributed on different nodes is called shard. ES automatically manages and organizes sharding, and rebalances shard data when necessary, so users don’t have to worry about the details of shard processing. The default number of shards for index creation is 5 and cannot be changed once the index is created.
Replica: ES Creates one replica by default. That is, each of the five main shards has a corresponding replica. Extra copies have advantages and disadvantages. Some copies have stronger fault recovery capability, but they also occupy the disk space of the corresponding multiple copies.
How many shards and copies should we create when creating an index?
For the number of copies, it is better to determine, according to the number of nodes in our cluster and our storage space, our cluster server is many, and there is enough storage space, you can set the number of copies, generally 1-3 copies, if the cluster server is relatively few and storage space is not so loose, You can set only one copy to ensure disaster recovery (the number of copies can be dynamically adjusted).
For the number of fragments, it’s more difficult to determine. Once the number of index fragments is determined, it cannot be changed. Therefore, before creating an index, we should fully consider the amount of data stored in the index we create later. Otherwise, creating an inappropriate number of index fragments will have a great impact on our performance.
As for the number of fragments, the industry agrees that the number of fragments depends on the memory. It is believed that 1GB of heap memory corresponds to 20 to 25 fragments, and the size of a fragment should not exceed 50 gb. Such a configuration contributes to the health of the cluster. However, I personally think such configuration method is too rigid. In the process of tuning ES cluster, I set corresponding fragments according to the size of total data to ensure that the size of each fragment does not exceed 50G (about 40G). However, compared with the previous number of fragments query, the effect is not obvious. After that, we tried to increase the number of fragments, and found that the query speed was significantly improved when the number of fragments increased, and the data volume of each fragment was controlled at about 10G.
Querying a large number of small shards makes each shard process data faster. Is it true that the more shards we have, the faster our queries will be and the better ES performance will be? In fact, it is not, because there is a shard merge process during the query process. If the number of shards increases, the merge time increases, and as more tasks need to be queued and processed sequentially, more small shards are not necessarily faster than querying a smaller number of larger shards. If there are multiple concurrent queries, having many small fragments can also reduce query throughput.
If you have a scenario where the number of shards is not appropriate and you don’t know how to adjust, a good solution is to create an index by time and then run a wildmatch query. If the amount of data on a daily basis is large, you can create an index on a daily basis. If the amount of data on a monthly basis is large, you can create an index on a monthly basis. If the existing index needs to be sharded again, the index needs to be rebuilt. The number of shards for each index can be set based on the total amount of data, write pressure, and number of nodes, and then the number of shards can be checked periodically according to the data growth status.
CES technical team of Tencent cloud recommends that: for indexes with a small amount of data (less than 100GB), the write and query pressure is relatively low. Generally, set the number of shards to 3 to 5 and the number_of_replicas to 1. For indexes with a large amount of data (more than 100GB), the data volume of a single shard is generally controlled within (20GB to 50GB) and the index pressure is allocated to multiple nodes: By the index. The routing. Allocation. Total_shards_per_node parameters, enforced a node on the index number of shards, as far as possible let shard distribution on different nodes Considering the index number of shards, If the number of shards (excluding duplicates) exceeds 50, the rejection rate may increase. In this case, you can split the index into multiple independent indexes to share the data volume and use routing together to reduce the number of shards to be accessed for each query.
Below, I will introduce some tuning of ES key parameters. There are a lot of scenarios about how much CPU our ES cluster is using and how to adjust. CPU usage is high. It may be caused by writing, or it may be caused by querying. How to check? * elasticsearch[{node}][search][T#10] * elasticsearch[{node}][search][T#10] * elasticsearch[{node}] Elasticsearch [{node}][bulk][T#1] is caused by data writing. In actual tuning, CPU utilization is high and Solid State disks are used instead of mechanical hard disks. Compared with mechanical disks, SSDS have higher read/write speed and stability. If not SSD, it is suggested that the index. The merge. The scheduler. Max_thread_count: 1 index merge the maximum number of threads is set to 1, the parameters can effectively regulate the performance of the writing. Because concurrent writes are performed on storage media, write performance is not improved, but decreased, due to addressing.
There are also several important parameters can be set, students can depend on their cluster situation and data situation.
Index. refresh_interval: this parameter indicates the number of seconds after data is written that can be searched. The default value is 1s. Each refresh of the index generates a new Lucene segment, which results in frequent merge behavior. If business needs are not so real-time, you can increase this parameter. The actual tuning tells me that this parameter is really powerful and CPU usage plummets.
Indices. The memory. Index_buffer_size: If we are going to do very heavy concurrent writes, it is better to make indices.memory.index_buffer_size larger. The index buffer size is common to all shards, up to 512MB for each shard. Because you can’t do much better than that. ES uses this setting as the index buffer shared by each shard, and shards that are particularly active use this buffer more often. The default value for this parameter is 10%, which is 10% of the JVM heap.
Translog: ES To prevent data loss, each time the INDEX, BULK, DELETE, and Update operations are complete, the Translog is refreshed to disks. While improving data security, it certainly degrades performance a bit. If you don’t care about this possibility and want performance first, you can set the following parameters:
"index.translog": {
"sync_interval": "120s".#sync interval increase
"durability": "async".# async update
"flush_threshold_size":"1g" #log file size
}
Copy the code
In this setting, asynchronous write is enabled and the interval and size of write is set, which improves write performance. Up the number
To ensure that the created ES index is evenly distributed on each Datanode, the number of shards with the same index on a Datanode should not exceed three. Calculation formula: (number_of_SHard * (1+number_of_replicas)) < 3*number_of_datanodes Specifies the number of shards allocated to each machine “index.routing.allocation.total_shards_per_node”: “2
Disk cache parameters
The vm.dirty_background_ratio parameter specifies that when the number of dirty pages cached by the file system reaches 5% of the system memory, background write back processes such as PDflush, Flush, and kdmFlush will run and asynchronously flush dirty pages to the external memory.
vm.dirty_ratio
This parameter specifies that when the number of dirty pages in the file system cache reaches the percentage of the system memory (for example, 10%), the system has to start processing dirty pages in the cache. During this process, many application processes may block because the system switches to processing file IO.
If the parameter is appropriately reduced, the principle is similar to (1). If the percentage of cached dirty data (MemTotal in this case) exceeds the threshold, the system stops all I/O writes at the application layer and waits for the I/O to resume after the data is flushed. So in case of triggering this operation of the system, the impact on the user is very big.
sysctl -w vm.dirty_ratio=10
sysctl -w vm.dirty_background_ratio=5
Copy the code
To make the Settings permanent, write the above configuration items to the /etc/sysctl.conf file
vm.dirty_ratio = 10
vm.dirty_background_ratio = 5
Copy the code
Merge Related parameters
"index.merge.policy.floor_segment": "100mb"."index.merge.scheduler.max_thread_count": "1"."index.merge.policy.min_merge_size": "10mb"
Copy the code
There are also some Settings for timeout parameters:
Ping_timeout Specifies the timeout setting for discovering that other nodes are alive during the master election. Discovery.ze.fd. ping_interval Specifies the frequency at which the node is pinged. Ping_timeout Indicates the response time for a node to survive. The default value is 30s. If the network may have hidden faults, Ping_retries ping fails or times out. The default value is 3Copy the code
Configure Linux system parameters
File handle
Linux, the maximum number of file handles each process the default is 1000, for the server process, obviously too small, by modifying the/etc/security/limits the conf to increase the handle to open the largest number
* - nofile 65535
Copy the code
Read optimization
For example, if you want to query the size pieces of data from the beginning of the fragment, you need to query the from+size pieces of data ranked first in each fragment. The collaborative node aggregates the collected N ×(from+size) pieces of data, sorts them again, and then returns size pieces of data starting from from+size. When one of the values of FROM, size, or n is large, the number of queries that need to be sorted increases. Such queries consume a lot of CPU resources, resulting in a decrease in efficiency. To improve query efficiency, ES provides Scroll and Scroll-scan query modes. Scroll: is designed to retrieve a large number of results. For example, we need to query 1 to 100 pages of data, 100 entries per page. If Search is used, the highest score from+100 data needs to be queried on each shard every time, and then the cooperative nodes aggregate the collected N ×(from+100) data and sort again. Each time 100 pieces of data starting from+1 are returned and the execution is repeated 100 times. For example, if you use Scroll query, query 10000 pieces of data in each fragment, merge and sort N ×10000 pieces of data together with nodes, and snapshot the top 10000 results. This has the advantage of reducing the number of queries and sorts.
Other Suggestions
Insert index automatically generated id: when the write end using a specific id to write data into the ES, ES will check whether there is the same id under the corresponding index, this operation will increase in the number of documents to make more and more expensive, so if there are no rigid requirements on business recommended ES automatically generated id, accelerate the write speed.
Avoid sparse indexes: Sparse indexes cause index files to grow. The ES keyword and array type adopt doc_values structure. Even if the field is empty, each document will occupy a certain amount of space. Therefore, sparse index will cause disk enlargement and reduce the efficiency of query and writing.
Parameter tuning
index.merge.scheduler.max_thread_count:1 Merge Maximum number of threads
indices.memory.index_buffer_size:30% # memory
index.translog.durability:async # This can asynchronously write disk, increase write speed
index.translog.sync_interval:120s #translog interval
discovery.zen.ping_timeout:120s # Heartbeat timeout
discovery.zen.fd.ping_interval:120s # Node detection time
discovery.zen.fd.ping_timeout:120s Ping timeout
discovery.zen.fd.ping_retries:6 Number of heartbeat retries
thread_pool.bulk.size:20 Since the query threads are specified in the code, I only adjust the number of write threads
thread_pool.bulk.queue_size:1000 Write thread queue size
index.refresh_interval:300s #index Refresh interval
bootstrap.memory_lock: trueTo keep the JVM locked and ES performance guaranteed.
Copy the code
About rebuilding indexes
Before rebuilding an index, consider the need to rebuild an index, which can be very time consuming. The ES REindex API does not attempt to set the target index and does not copy the Settings of the source index, so we should set the target index before running the _reindex operation, including setting the mapping, sharding, copy, etc.
The first step is to create a new index as you would a normal index. When there is a large amount of data, set refresh_intervals to -1, indicating that refresh_intervals are not refreshed, and number_of_replicas to 0 (because the number of replicas can be adjusted dynamically, which helps speed).
{
"settings": {
"number_of_shards": "50"."number_of_replicas": "0"."index": { "refresh_interval": "1"}}"mappings": {}}Copy the code
The second step is to call the reindex interface, with the recommended parameter wait_for_completion=false, so that the reindex will return taskId directly.
POST _reindex? wait_for_completion=false { "source": { "index": "old_index"// Original index"size": 5000 // The amount of data processed in a batch},"dest": { "index": "new_index", // Target index}}Copy the code
Step three: Wait. GET _tasks? Detailed = True&Actions =*reindex to check the progress of the rebuild. To cancel a task, call _tasks/node_id:task_id/_cancel.
Step 4: Delete the old index to free up disk space. When reconstructing an index, add the timestamp of the last reconstructing index in the parameter. For example, if our data is 100 GB, then we rebuild the index, but the 100 GB is increasing, then we need to record the timestamp of the reconstructing index. The purpose of recording the timestamp is that the next index run does not rebuild the entire index, but only after the timestamp, and so on until the old and new indexes have the same amount of data, and the data flow is changed to the name of the new index.
POST /_reindex
{
"conflicts": "proceed"// Skip the conflict, otherwise an exception will be thrown and the task will stop"source": { "index": "old_index"/ / the old index"query": { "constant_score" :
{ "filter" : {
"range" : { "data_update_time" :
{ "gte": 123456789 //reindex Milliseconds before the start time timestamp}}}}}},"dest": { "index": "new_index"// New index"version_type": "external"}}Copy the code