Fifth, distributed characteristics and distributed search mechanism
1. Cluster distributed model and the problem of master selection and split brain
We know that ES is a distributed architecture by nature
1.1 Distributed Features
ElasticSearch
The benefits of distributed architecture- Horizontal storage capacity expansion supported
PB
Level data - Improves system availability. Some nodes stop services, but the services of the whole cluster are not affected
- Horizontal storage capacity expansion supported
ElasticSearch
Distributed architecture of- Different clusters are distinguished by different names, the default name
elasticsearch
- Through configuration files, or on the command line
-E cluster.name=geektime
To set
- Different clusters are distinguished by different names, the default name
1.2 ElasticSearch
Nodes of the cluster
- A node is a
ElasticSearch
An instance of the- It’s essentially one
JAVA
process - You can run more than one machine
ElasticSearch
Processes, but production environments generally recommend running one on a single machineElasticSearch
The instance
- It’s essentially one
- Each node has a name, configured through a configuration file, or at startup time
-E node.name=geektime
The specified - Each node is assigned one after it is started
UID
And stored in thedata
directory
1.3 Coordinating Node
node
- We call the request to the node, and the node that handles the request is called
Coordinating Node
- Routing requests to the correct node, such as a request to create an index, requires routing to
Master
node
- Routing requests to the correct node, such as a request to create an index, requires routing to
- The default is for all nodes
Coordinating Node
- By setting other types to
False
Make it becomeDedicated Coordinating Node
1.4 Data Node
node
- A node that can hold data is called
Data Node
- When a node is started, it is a data node by default. You can set the
node.data.false
ban
- When a node is started, it is a data node by default. You can set the
Data Node
The responsibility of the- Save fragmented data. Plays a crucial role in data expansion (by
Master Node
Decide how to distribute shards to data nodes.)
- Save fragmented data. Plays a crucial role in data expansion (by
- By adding data nodes
- Can solve data horizontal scaling and solve data single point problems
1.5 Master Node
node
Master Node
The responsibility of the- Handles index creation/deletion requests/decides which node to allocate shards to/is responsible for index creation/deletion
- Maintain and update
Cluster State
Master Node
Best practices ofMaster
Nodes are very important and need to be considered in deployment to solve the problem of single points- Set multiple for a cluster
Master
Nodes/each node is only assumedMaster
The single role of
1.6 Master Eligible Nodes
& Select the main process
- Multiple clusters can be configured for one cluster
Master Eligible
Node. These nodes can be used when necessary (e.gMaster
Node failure, network failure) participate in the main selection process, becomeMaster
node - Each node starts with one by default
Master eligible
node- You can set the
node.master: false
ban
- You can set the
- When the first one in the cluster
Master eligible
When a node starts, it elects itselfMaster
node
1.6.1 Master Eligible Nodes
The process of node selection
- Each other
ping
Each other,Node Id
The low ones will become the nodes to be elected - Other nodes join the cluster but are not responsible
Master
Role of a node. Once it is found that the selected primary node is missing, a new one is electedMaster
node
1.7 Cluster Information
- Cluster status information (
Cluster State
), maintains the necessary information in a cluster- All node information
- All indexes and their associated
Mapping
与Setting
information - Fragmented routing information
- Cluster status information is stored on each node
- However, only
Master
A node can modify cluster status information and synchronize it to other nodes- Because any node can modify the information will cause
Cluster State
Inconsistent information
- Because any node can modify the information will cause
1.8 Split brain problem
Split-Brain
When a network problem occurs, one node cannot connect to another nodeNode2
和Node3
There will be a new electionMaster
Node1
Oneself still asMaster
To form a cluster and update at the same timeCluster State
- There are two nodes in the cluster
Master
, maintain differentCluster State
. When the network is recovered, the correct recovery cannot be selected
1.8.1 How to avoid split-brain problem
- Qualifying an election condition, set
Quorum (arbitration)
Only if theMaster eligible
Number of nodes is greater thanQuorum (arbitration)
Before an election can be heldQuorum = (number of Master nodes / 2) + 1
- When the three
Master eligible
When settingdiscovery.zen.minimum_master_nodes
A value of 2 can avoid brain split
- Starting with 7.0, this configuration is not required
- remove
minimum_master_nodes
Parameters,Elasticsearch
Choose the node that can form the arbitration - Typical primary elections now take a very short time to complete. Cluster scaling becomes safer, easier, and there are fewer system configuration options that can cause data loss
- Nodes more clearly record their status to help diagnose why they cannot join the cluster or why they cannot elect a master node
- remove
1.9 Configuring the Node Type
A Node is a Master eligible Node,data Node, and ingest Node by default
The node type | Configuration parameters | The default value |
---|---|---|
master eligible |
node.master |
true |
data |
node.data |
true |
ingest |
node.ingest |
true |
coordinating only |
There is no | Set all three parameters tofalse |
machine learing |
node.ml |
true (needenable-x-pack ) |
2. Failover of fragments and clusters
2.1 Primary Shard
(Master sharding) – Increases system storage capacity
- Fragmentation is
ElasticSearch
The cornerstone of distributed storage- Master fragment/copy fragment
- With master sharding, data is distributed across all nodes
Primary Shard
, the data of one index can be divided into multiple indexesData Node
To achieve the horizontal expansion of storage- The shard (
Primary Shard
) is specified when the index is created and cannot be changed by default. To change the value, you need to rebuild the index
2.2 Replica Shard
(Copy sharding) – Improves data availability
- Data availability
- By introducing replica sharding (
Replica Shard
) Improve data availability. Once the master shard is lost, the replica shard canPromote
Master shard. The number of duplicate fragments can be dynamically adjusted. There is complete data on the entire node. If you do not configure a copy sharding, data may be lost once a node hardware failure occurs
- By introducing replica sharding (
- Improves the read performance of the system
- The replica shards are synchronized with the master shard. Add through support
Replica
To a certain extent, the read throughput can be improved
- The replica shards are synchronized with the master shard. Add through support
2.3 Setting of fragment number
- If you plan the number of primary shards and the number of duplicate shards for an index
- The number of master shards is too small: for example, 1 is created
Primary Shard
theIndex
- If the index grows quickly, the cluster cannot scale the index by adding nodes
- If the number of main fragments is too large, a single fragment is generated
Shard
The capacity is small, resulting in too many fragments on a node, affecting performance - If the number of duplicate fragments is too high, the write performance of the cluster is degraded
- The number of master shards is too small: for example, 1 is created
2.4 Cluster Node Faults
2.4.1 Single-node Cluster
- Copy cannot be sharded, cluster status is yellow (
Yellow
) - We can add a data node so that the replica shard can be allocated to the second node
2.4.2 Adding a Data Node
- The cluster status turns green
- The cluster has failover capability
- Try to
Replica
Set to 2 and 3 to view the status of the cluster
2.4.3 Adding another Data Node
- The cluster has failover capability
Master
The node determines which node the shard is allocated to- You can add nodes to improve the computing capability of the cluster
2.4.4 Failover
- Three nodes together form a cluster. Contains an index set to 3
Primary Shard
And 1Replica Shard
- Node 1 is
Master
Node. The node is faulty unexpectedly. Cluster re-electionMaster
node Node3
On theR0
Ascension intoP0
, the cluster turns yellowR0
andR1
After the assignment, the cluster turns green
2.5 Cluster Health Status
Green
: Health status, all master and replica shards are availableYellow
: Sub-healthy. All master shards are available but some replica shards are unavailableRed
: Unhealthy state, some master shards are not available
3. Sharding and its lifecycle
3.1 Internal principles of sharding
- What is the
ES
The fragmentation ofES
The smallest unit of work/is oneLucene
的Index
- Some questions:
- why
ES
The search is in near real time (it is searched after 1 second) ES
How to ensure that data is not lost in the event of a power outage- Why does deleting a document not immediately free up space
- why
3.2 Invert index immutability
- Inverted index is used
Immutable Design
Once generated, it cannot be changed - Immutability brings the following benefits:
- There is no need to worry about concurrent writes to files, avoiding the performance problems associated with locking
- Once read into the kernel’s file system cache, it stays there. As long as there is enough space in the file system, most requests will go directly to memory and will not hit disk, which greatly improves performance
- Caches are easy to generate and maintain/data can be compressed
- Immutability, the challenge: If a new document needs to be searchable, the entire index needs to be rebuilt
www.zhulou.net/post/8005.h…
3.3 Lucene Index
- in
Lucene
In, a single inverted index file is calledSegment
.Segment
Is self-contained and cannot be changed. multipleSegments
All together, calledLucene
theIndex
Lambda corresponds to lambdaES
In theShard
- When a new document is written, a new one is generated
Segment
, all of them will be queried simultaneouslySegment
And summarize the results.Lucene
There is a file in theSegments
Is calledCommit Point
- The deleted document information is saved in
.del
In the file
3.4 what isRefresh
When ES writes to a document, instead of writing the document directly to the Segment, it writes to a Buffer called Index Buffer, which then writes to the Segment
- will
Index buffer
writeSegment
The process is calledRefresh
.Refresh
Does not performfsync
operation Refresh
Frequency: occurs once by default 1sindex.refresh_interval
Configuration.Refresh
Then the data can be searched. That’s whyElasticSearch
It’s called near real time search- If the system has a lot of data written, it will produce a lot
Segment
Index Buffer
Trigger when occupiedRefresh
, the default value isJVM
10% of the
3.5 what isTransaction Log
Segment
The process of writing to disk is time-consuming, and with file system caching,Refresh
When will be firstSegment
Write to the cache to open the query- To ensure that data is not lost. So in the
Index
Document at the same timeTransaction Log
, starting with the high version,Transaction Log
Default drop disk. One per shardTransaction Log
- in
ES
forRefresh
When,Index Buffer
Be cleared,Transaction log
Don’t empty the
3.6 what isFlush
ES Flush
&Lucene Commit
- call
Refresh
.Index Buffer
Empty andRefresh
- call
fsync
, will cacheSegments
Written to disk - Empty (delete)
Transaction Log
- The default value is 30 minutes
Transaction Log
Full (default 512MB)
- call
3.7 Merge
operation
With Flush(3.6 above), the Segment is written to disk. As time goes by, more and more Segment files are written to disk, so we need to do regular processing of these Segment files
ES
andLucene
It happens automaticallyMerge
operationPOST my_index/_forcemerge
Segment
A lot, need to be merged periodically- To reduce
Segments
/ Delete the deleted document
- To reduce
4. Analyze distributed query and correlation score
4.1 Operation mechanism of distributed search
ElasticSearch
The search will be conducted in two stages- Stage 1:
Query
- Stage 2:
Fetch
- Stage 1:
Query-then-Fetch
4.2 Query
phase
- The user sends a search request to
ES
Node. After receiving the request, the node uses theCoordinating node
The identity of the node, randomly select 3 shards from the 6 primary and secondary shards, and send the query request
Here said that some of the issues, not random, see this blog www.elastic.co/guide/cn/el…
- The selected shards are queried and sorted. Each shard is then returned
From + Size
The sorted document Id and sorted value are givenCoordinating
node
4.3 Fetch
phase
Coordinating Node
willQuery
Phase, the sorted document Id list obtained from each shard is sorted again. selectFrom
toFrom + Size
The Id of the document- In order to
Multi get
Request to the corresponding shard to obtain detailed document data
4.4 Query Then Fetch
Potential problems
- Performance issues
- Number of documents to be queried on each shard =
from + size
- The final coordination node needs to deal with:
number_of_shard*(from+size)
- The depth of the page
- Number of documents to be queried on each shard =
- Correlation score
- Each shard calculates relevancy based on the data in its own shard. This can lead to scoring deviations, especially if there is a small amount of data. Correlation scores are independent of each other between shards. When the total number of documents is small, if the main shards are greater than 1, the more the main shards, the more inaccurate the correlation calculation
4.5 Methods to solve inaccurate calculation
- If the amount of data is small, you can set the number of primary shards to 1
- When the amount of data is large enough, the results are generally not biased as long as the documents are evenly distributed across the shards
- use
DFS Query Then Fetch
- The search of
URL
Specified parameters in_search? search_type=dfs_query_then_fetch
- Search the word frequency and document frequency of each fragment, and then perform a complete correlation calculation, which costs more
CPU
And memory, low performance, generally not recommended
- The search of
5. The sorting andDoc Values
& Fielddata
By default, ElasticSearch will sort the score, so you can specify sort rules by specifying the sort parameter
5.1 the sorting
ElasticSearch
By default, the correlation score is used to sort the results in descending order- You can do this by setting
sorting
Parameter, self-set sort - If not specified
_score
, is divided intonull
5.2 example
5.2.1 Single-field Sort
Let’s look at an example
POST /kibana_sample_data_ecommerce/_search {"size": 5, "query": {"match_all": {}}, "sort": [{"order_date": { "order": "desc" } } ] }Copy the code
We sort orders in descending order by time
5.2.2 Multi-field Sorting
POST /kibana_sample_data_ecommerce/_search {"size": 5, "query": {"match_all": {}}, "sort": [{"order_date": { "order": "desc" } }, { "_doc": { "order": "asc" } }, { "_score": { "order": "desc" } } ] }Copy the code
5.2.3 requires toText
Type of fields to sort
We found an error indicating that we need to turn on the Fielddata property
# fieldData PUT kibanA_sample_data_ecommerce /_mapping {"properties": {"customer_full_name": {"type": "text", "fielddata": true, "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }Copy the code
5.3 Sorting process
- Sorting is done against the original content of the field. [Fixed] Inverted indexes don’t work
- You need a straight index. Through the document
Id
And field quickly get the original content of the field ElasticSearch
There are two ways to do thisFielddata
Doc Values(column storage, invalid for Text type)
(That’s why it lets us turn on Fielddata when we sort fields of type Text.)
5.3.1 Doc Values
VS Field Data
|
Doc Values |
Field data |
---|---|---|
When creating a | Index, created with the inverted index | Dynamically created at search time |
Create a location | Disk file | JVM Heep |
advantages | Avoid large memory footprint | Indexing is fast and does not take up extra disk space |
disadvantages | Reduces indexing speed and takes up extra disk space | Too many documents, dynamic creation overhead, too much occupationJVM Heep |
The default value | ES 2.x after |
ES 1.x And before |
5.3.2 closedDoc Values
- This function is enabled by default
Mapping
Set off- Increase index speed/reduce disk space
- If you open it again, you need to rebuild the index
- When does it need to be closed
- Make it clear that sorting and aggregation analysis are not required
6. Paging and traversal: From,Size,Search After & Scroll API
6.1 From/Size
- By default, queries are sorted by relevance score and return the top 10 records
- Easy to understand paging scheme
From
: Starting positionSize
: Total number of documents expected
6.2 Deep paging in distributed systems
ES
Distributed by nature. Query information, but the data is stored on multiple shards, multiple machines,ES
Innate need to satisfy the need for sorting (score by relevance)- When a query:
From=990
.Size=10
- 1000 documents are first fetched on each shard. And then, through
Coordinating Node
Aggregate all the results. Finally, the top 1000 documents are selected by sorting - The deeper the page, the more memory. To avoid the memory overhead of deep paging.
ES
There is a default limit of 10000 documentsIndex.max_result_window
- 1000 documents are first fetched on each shard. And then, through
6.3 Simple paging example
POST TMDB /_search {"from": 0, "size": 20, "query": {"match_all": {}}}Copy the code
- ES reports an error when the maximum value exceeds 10000
6.4 Search After
Avoid the problem of deep paging
We know that the deeper pages we have, the more memory we consume, so what about avoiding the problem of deep paging?
Search After
It can avoid the performance problem of deep paging and obtain the information of the next page in real time- Page count not supported (
From
) - You can only flip down
- Page count not supported (
- The first search needs to be specified
sort
And ensure that the value is unique (this can be done by adding_id
Guaranteed uniqueness) - And then use the last document, the last document
sort
Value to query
6.4.1 Search After
An example of
1. Data preparation
# Search After
DELETE users
POST users/_doc
{"name": "user1","age":10}
POST users/_doc
{"name": "user2","age":11}
POST users/_doc
{"name": "user3","age":12}
POST users/_doc
{"name": "user4","age":13}
Copy the code
2. UseSearch After
Query (first step search needs to be specifiedsort
And ensure that the value is unique (this can be done by adding_id
Guaranteed uniqueness)
POST users/_search
{
"size": 1,
"query": {
"match_all": {}
},
"sort": [
{"age": "desc"},
{"_id": "asc"}
]
}
Copy the code
The search needs to specify sort, and the value is guaranteed to be unique (which can be guaranteed by adding the _id)
3. ReuseSearch After
Query (then use the last, last document’ssort
Value for query)
POST users/_search {"size": 1, "query": {"match_all": {}}, "search_after": [ 13, "ka36BngBc4LlHCXeCOxJ" ], "sort": [ {"age": "desc"}, {"_id": "asc"} ] }Copy the code
And so on, each time filling in the last search sort value until the query result is empty
6.4.2 Search After
How to solve the problem of deep paging
- Assume that
Size
Is ten - Query 990-1000
- The number of documents to be processed is limited to 10 at a time by unique sorting value positioning
6.5 Scroll API
(Mostly used in actual scenes)
The Scroll API is also provided by ElasticSearch to iterate over results. It specifies the Scroll survival time for the first invocation. Within this time, we can process the query results. Let’s enter the Scroll Id of the previous query to Scroll through the following data.
It also has some limitations, because the request creates a snapshot that cannot be found when new data is written
- After a snapshot is created, new data cannot be detected
- After each query, enter the previous one
Scroll Id
6.5.1 Scroll API
A simple Demo
1. Insert three pieces of data
DELETE users
POST users/_doc
{"name": "user1","age":10}
POST users/_doc
{"name": "user2","age":20}
POST users/_doc
{"name": "user3","age":30}
POST users/_count
Copy the code
2. Based onScroll API
Create a snapshot
# # based on scroll API to create snapshot DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAFdgWOFN4NEJyd05ScnFmVjRzM3M2dzdvUQ = = POST/users / _search? scroll=5m { "size": 1, "query": { "match_all": {} } }Copy the code
3. Try writing another document
POST users/_count
POST users/_doc
{"name": "user4","age":40}
Copy the code
4. ScrollScroll API
POST /_search/scroll
{
"scroll": "1m",
"scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAFdgWOFN4NEJyd05ScnFmVjRzM3M2dzdvUQ=="
}
Copy the code
Write the scroll_id we saved in the previous step to this scroll_id, and we get the next data. Here our size is 1, we can only get one data at a time
And when we do that, at the end, we can’t get the data that we inserted in the middle
6.6 Different Search Types and Application Scenarios
Regular
- The top part of the document needs to be accessed in real time. For example, query the latest order
Scroll
- Require full documentation, such as exporting all data
Pagination
From
andSize
- If deep paging is required, select it
Search After
7. Process concurrent read and write operations
7.1 Necessity of concurrency control
- two
Web
The program updates a document at the same time, and the lack of effective concurrency results in the loss of changed data - Pessimistic concurrency control
- Assume that there is a possibility of change conflicts. Resources are locked to prevent conflicts. For example, database row locks
- Optimistic concurrency control
- It is assumed that the conflict will not occur and will not block the operation being attempted. If data is modified while reading or writing, the update will fail. How does the application decide to resolve conflicts, such as retry updates, use new data, or report errors to users
ES
Optimistic concurrency control is used
7.2 ES
For optimistic concurrency control
ES
The document in is immutable. If you update a document, the old document is marked as deleted and a brand new document is added. Contemporaneous documentationversion
Field plus 1- Internal version control
if_seq_no + if_primary_term
- Use external versions (use other databases as primary data stores)
version + version_type = external
Elasticsearch Is a game about Elasticsearch. It’s a game about Elasticsearch.