Cabbage Java self study room covers core knowledge

ElasticSearch 下 载 Java Engineer 下 载 Java Engineer 下 载 Java Engineer 下 载

1. The ElasticSearch index is dynamically updated

1.1. Immutability of indexes

Immutability of indexes

Because of the structural nature of inverted indexes, modifying them after they have been created can be very complicated. Adding several layers of index nesting makes index updating an almost impossible task. So it is simply designed to be immutable: the inverted index is immutable after being written to disk, and it never changes.

Immutability has important value:

  1. No locks required. If you never update indexes, you don’t need to worry about multiple processes modifying data at the same time.
  2. Once the index is read into the kernel’s file system cache, it stays there because of its immutability. As long as there is enough space in the file system cache, most read requests go directly to memory and do not hit disk. This provides a significant performance boost.
  3. Other caches, like the filter cache, remain in effect for the lifetime of the index. They do not need to be rebuilt every time the data changes, because the data does not change.
  4. Writing a single large inverted index allows data compression, reducing disk I/O and the amount of indexes that need to be cached into memory.

Of course, there are downsides to an unchanging index. The main fact is that it’s immutable, you can’t modify it. If you need to make a new document searchable, you need to rebuild the entire index. This places a significant limit on either the amount of data an index can contain or how often the index can be updated.

1.2. Dynamic update of indexes

How to update an inverted index while preserving invariance? The answer is: more indexes.

Add new supplementary indexes to reflect recent changes, rather than rewriting the entire inverted index. Each inverted index is queried in turn – starting with the earliest – and then the results are merged.

Elasticsearch is based on Lucene and introduces the concept of segment search. Each segment is itself an inverted index, but indexes in Lucene add the concept of commit points in addition to representing a collection of all segments — a file that lists all known segments, and new documents are first added to the in-memory index cache and then written to a disk-based segment.

Queries in Lucene are based on segments. Each segment can be regarded as a separate subindex. During the indexing process, Lucene continuously flushes the data in memory to create a new segment. Multiple segments will also be merged into one large segment, and old segments and queries will not be deleted when they are read, and seinterfaces that are not read and integrated will be deleted.

The dynamic update process of the index is roughly as follows:

  1. The data is first written into the buffer, and then written into the Translog log file at the same time. Note: At this time, the data has not been successfully recorded by ES index, so the corresponding data cannot be searched.

  1. If the segment is nearly full or a certain amount of time has elapsed, es will refresh the buffer data to a new segment file, but the data is not directly entered into the segment file disk file, but first entered into the OS cache. This process is refresh.

Every 1 second, es writes a segment file to the buffer, so a segment file is created every second. This segment file stores the data that was written to the buffer in the last 1 second. In the operating system, disk files actually have an OPERATING system cache OS cache. Therefore, data is stored in the OS cache before being written to disk files.

Once the data in the buffer is flushed to the OS cache by refresh operation, it means that the data can be searched.

This is why ES is called near real-time (NRT) : Written data is refresh every second by default, that is, data can be searched by ES every second before it can be seen later, so it is called near real-time.

As soon as the segment file is entered in the OS cache, the buffer is cleared and the data is persisted to disk in the translog file. Then the segment file can be used for external searches.

  1. Repeat steps 1 and 2 to write new data into buffer and Translog segment file after segment file. After each refresh, the buffer is cleared and translog keeps a copy of the log data. As this process progresses, the Translog file grows. When the Translog file reaches a certain level, the COMMIT operation is performed.

  1. Commit the first step of the operation is to refresh the existing data in the buffer to the OS cache to clear the buffer.

  2. Write a commit point to a disk file that identifies all segment files corresponding to the commit point, and force all current data in the OS cache to fsync to the disk file.

  3. Empty the existing Translog and restart again to enable a Translog, at which point the COMMIT operation is complete.

What is a Translog log file for?

As you can see from the flow above, documents stored in memory and file system caches between fsync operations are not secure and can be lost in the event of a power outage. So ES introduced translog to record all the operations between fsync, so that when the machine recovers from a failure or restarts, ES can restore according to translog.

Of course, the Translog itself is a file that exists in memory and can be lost in the event of a power outage. Therefore, ES writes translog to disk every five seconds or after a write request completes. An operation on a document can be considered safe and recoverable once written to disk, so ES will return the result of a successful operation to the client that requested the operation only if the record of the current operation is written to disk.

  1. In addition, since a new segment is created every second, there will soon be a large number of segments. If a fragment is queried, all segments in the fragment will be queried in turn, which reduces the search efficiency. Therefore, ES will automatically start merging segments into a larger segment. The merging process actually creates a new segment, and when the new segment is written to disk, all the merged old segments are erased.

2. Update and delete ElasticSearch files

2.1. Document updates

When you create a new document, Elasticsearch assigns a version number to that document. Each change to the document produces a new version number. When an update is performed, the old version is marked as deleted in the.del file, and the new version is indexed in the new segment. Older versions may still match the search query, but filter it out from the results.

2.2. Document deletion

Segments are immutable, so you can neither remove the document from the old segment nor modify the old segment to reflect the update of the document.

Each segment on disk has a.del file associated with it. When a delete request is sent, the document is not actually deleted, but is marked as deleted in the.del file. The document may still be searchable, but will be filtered from the results. When segments are merged, documents marked as deleted in the. Del file are not included in the new segment.

2.3. Concurrency control

In the database world, two methods are commonly used to ensure that changes are not lost in concurrent updates:

2.3.1. Pessimistic concurrency control

This approach, widely used by relational databases, assumes that change conflicts are likely to occur and therefore blocks access to resources to prevent conflicts. A typical example is to lock a row of data before reading it, ensuring that only the thread that placed the lock can modify the row.

2.3.2. Optimistic concurrency control

The approach used in Elasticsearch assumes that conflicts are impossible and will not block the operation being attempted. However, if the source data is modified during reads and writes, the update will fail. The application then decides how to resolve the conflict. For example, you can retry updates, use new data, or report back to the user.

Elasticsearch is distributed. When a document is created, updated, or deleted, the new version of the document must be copied to other nodes in the cluster. Elasticsearch is also asynchronous and concurrent, which means these replication requests are sent in parallel and may arrive at their destination out of order. Elasticsearch needs a way to ensure that the old version of a document does not overwrite the new version.

Each document has a _version (version number), which increases when the document is modified. Elasticsearch uses this _version number to ensure that changes are executed in the correct order. If an old version of the document arrives after the new version, it can simply be ignored.

  • Use internal version numbers: When deleting or updating data, carry the _version parameter. If the latest version of the document is not this version number, the operation will fail. This version number is automatically generated by ES and increments by one after each operation.
  • Use external version numbers: ES uses an increasing integer as the version number by default, or an external custom integer (type long) as the version number, such as a timestamp. You can use a custom version number by adding the parameter version_type=external. If the internal version number is used, the update or delete operation must carry the latest version number of the ES index. The external version number is handled a little differently from the internal version number we discussed earlier. Instead of checking if the current _version is the same as the version specified in the request, Elasticsearch checks if the current _version is less than the specified version number. If the request is successful, the external version number is stored as the new _version of the document.

3. Data read and write principles of Elasticsearch

3.1. Data writing principle of ES

Each doc determines which shard to write to using the following formula:

shard= hash(routing) % number_of_primary_shards
Copy the code

Routing is a variable value. The default value is the _id of the document. You can also customize a routing rule.

By default, a primary SHard needs to be sure that a quorum, or majority, of shard copies are available before a write operation. This is to prevent data from being written to the wrong partition if there is a network partition.

A quorum is determined by the following formula:

Int ((primary + number_of_replicas) / 2) + 1 // number_of_replicas is the number of replicas specified in the index SettingsCopy the code

The values that determine consistency are: one (only primary shard), all (the primary and all replicas), or the default quorum.

Elasticsearch waits until it times out if there are not enough Copies of shard available. The default wait is one minute.

  1. After a new document is indexed, it is first written to memory, but to prevent data loss, a copy of data is appended to the trans log. New documents are constantly being written to memory, as well as to the transaction log. At this point the new data cannot be retrieved and queried.
  2. When the default Refresh time is reached or a certain amount of data in memory is reached, a Refresh is triggered to Refresh the data in memory as a new segment to the file cache system and clear the memory. While the new segment is not committed to disk, it provides document retrieval and cannot be modified.
  3. As new document indexes are continuously written, a Flush is triggered when the log data size exceeds 512 MB or the log time exceeds 30 minutes. The data in memory is written to a new segment and written to the file cache system. The data in the file system cache is flushed to disk via Fsync, commit points are generated, the log file is deleted, and an empty new log is created.

3.2. Principle of ES reading data

Elasticsearch queries fall into two main categories:

  • Get request: query specific Doc by ID;
  • Search request: Matches Doc with Query Query.
  1. If the Segment is not found, then query the Segment. If the Segment is not found, then query the Segment. This kind of query is Real Time. This query order can ensure that the Doc queried is the latest version of Doc, and this function is also to ensure real-time requirements in NoSQL scenarios.
  2. Search queries memory and disk segments together, then merges the results and returns them. This query is Near Real Time, mainly because Index data in memory takes some Time to refresh as Segment.

3.2.1. query_then_fetch

All search systems have a two-stage query. The first stage is to query the DocID, and the second stage is to query the complete document corresponding to the DocID. This is called query_then_fetch in Elasticsearch. Called query_and_FETCH in Elasticsearch, the second is typically used for requests that only need to query one Shard.

3.2.2. DFS_query_and_fetch

In addition to one phase, two phase, there is a three phase query situation. Term Frequency (TF) and DF (Document Frequency) are used to calculate the base score of Elasticsearch, but the TF and DF in each Shard are also independently queried. Although _routing is used to ensure the uniform distribution of Doc during writing, TF and DF cannot be guaranteed to be uniform. In this case, local TF and DF are inaccurate, and the calculation based on TF and DF is inaccurate.

To solve this problem, Elasticsearch introduced DFS queries such as DFS_query_then_fetch, which collected TF and DF values from all shards and then carried them into the request, query_THEN_fetch again, In this way, TF and DF are accurate when calculating points, similar to DFS_query_and_fetch. The advantage of this kind of query is that the score is more accurate, but the efficiency will be worse. Another option is to replace the TF/DF model with BM25.

In the new version of Elasticsearch, you cannot specify DFS_query_and_fetch or query_and_fetch, which can only be overridden by Elasticsearch.

4. Principle of ElasticSearch Distributed cluster

ElasticSearch cluster/node/shard/copy/index/document

  • Cluster: ES can act as an independent single search server. However, in order to handle large data sets, achieve fault tolerance, and high availability, ES can run on many collaborative servers. A collection of these servers is called a cluster.
  • Node: Each server that forms a cluster is called a Node, and a Node can contain multiple shards.
  • Shard: When there are a large number of documents, one node may not be enough due to memory limitations, insufficient disk processing capacity, inability to respond to client requests fast enough, etc. In this case, the data can be divided into smaller pieces. Each shard is placed on a different server.

When the index of your query is distributed over multiple shards, ES sends the query to each related shard and groups the results together without the application knowing that shards exist.

  • Replia: Sharded copies can be used to improve query throughput or for high availability.

A replica is an exact copy of a shard, and each shard can have zero or more copies. ES can have many identical shards, one of which is selected to change the index operation. This special shard is called the master shard. When the master shard is lost, such as when the data on which the shard resides is unavailable, the cluster promotes the replica to the new master shard.

  • Document: Refers to a row of data.
  • Index: is a collection of documents (and SQL database table).

When we add data to Elasticsearch, we need to use the index — the place to keep the related data. An index is actually a logical namespace that points to one or more physical shards. A shard is an underlying unit of work that holds only a fraction of the total data. A sharding is an instance of Lucene and a complete search engine in its own right. Our documents are stored and indexed into shards, but the application interacts directly with the index rather than the shard.

Elasticsearch uses sharding to distribute data across the cluster. A shard is a container for data. Documents are stored in the shard, which is then distributed among nodes in the cluster. Elasticsearch automatically migrates shards between nodes as your cluster grows or shrinks, so that data is still evenly distributed across the cluster.

A shard can be either a master shard or a replica shard. Any document in the index belongs to a master shard, so the number of master shards determines the maximum amount of data the index can hold. A replica shard is just a copy of a master shard. Copy sharding serves as a redundant backup to protect data against hardware failure and provides services for reading operations such as searching and returning documents. The number of master shards is determined at index creation time, but the number of replica shards can be changed at any time.

4.1. Node Roles

Servers in an ES cluster are divided into the following roles:

  • The master node:
  1. Cluster configuration information
  2. Node information of the cluster
  3. Template Settings
  4. Index and corresponding Settings, mapping, toggle, and aliases
  5. The shard to which the index is associated and the node to which it is assigned
  • The data nodes:
  1. Responsible for data storage and query
  • The coordinator node:
  1. Routing index request
  2. Aggregate the set of search results
  3. Distribute bulk index requests

4.4.1. Master the election

Electoral strategy

If there is a master in the cluster, add the master to the cluster. If there is no master in the cluster, select the node with the smallest ID as the master.

Election time

  • Cluster startup: The background startup thread pings the nodes in the cluster and elects the master node based on the preceding policy.
  • If the existing master node leaves the cluster, a thread in the background periodically pings the master node. If the ping fails for a certain number of times, the master node is elected again.

Avoid fissure

Split brain is a common problem in the distributed cluster that adopts the master-slave mode. Once the split brain occurs, the cluster status will be inconsistent, resulting in data errors or even data loss.

  • Policy for ES to avoid split brain: In the rule of half, you can add the following configuration to the CLUSTER configuration of ES to avoid split brain.

4.2. Data copy

ES ensures the high availability of cluster data by means of copy sharding and increases the capacity of the cluster to concurrently process query requests. Accordingly, the writing pressure of the cluster will increase in the data writing stage.

During data writing, data is first routed to the master fragment and then sent to the replica fragment. To prevent data loss, ensure that data is successfully written to at least one replica fragment before data is returned to the client.

4.3. Horizontal Expansion

One shard on Node 1 and one shard on Node 2 has been migrated to the new Node 3, so there are now two shards on each Node instead of three. This means that the hardware resources (CPU, RAM, I/O) of each node will be shared by fewer shards and the performance of each shard will be improved.

Sharding is a full-featured search engine that has the ability to use all resources on a node. Our index with six shards (3 master shards and 3 replica shards) can be expanded to a maximum of six nodes, with one shard on each node, and each shard has all the resources of its node.

But what if we want to expand by more than 6 nodes?

The number of master shards is determined when the index is created. In effect, this number defines the maximum amount of data that the index can store (the actual size depends on your data, hardware, and usage scenario). However, read operations — searching and returning data — can be handled by both master and replica shards, so the more replica shards you have, the higher throughput you will have.

The number of replica shards can be dynamically adjusted on a running cluster, and we can scale the cluster on demand. Let’s increase the number of copies from the default of 1 to 2:

4.4. Failover

If we shut down the first node, the state of the cluster is:

The node we shut down is a primary node. The cluster must have a master Node to work, so the first thing that happens is to elect a new master Node: Node 2.

When we shut down Node 1 we lost master shards 1 and 2, and the index didn’t work when we lost the master shard. If we were to check the health of the cluster at this point, we would see a red state: not all master shards are working properly.

Fortunately, full copies of both master shards exist on other nodes, so the new master Node immediately promotes the corresponding replica shards on Node 2 and Node 3 as master shards.

4.5. Routing mechanism

When a document is indexed, it is stored in a master shard. How does Elasticsearch know which shard to store a document in? When we create a document, how does it decide whether the document should be stored in shard 1 or shard 2?

First of all, it certainly can’t be random, otherwise we won’t know where to look when we need to retrieve documents in the future. In fact, the process is determined by the following formula:

shard = hash(routing) % number_of_primary_shards
Copy the code

Routing is a variable value, which defaults to the _id of the document and can be set to a custom value. Routing uses the hash function to generate a number, which is then divided by number_of_primary_shards (number of primary shards) to get the remainder. The remainder, between 0 and number_of_primary_shreds -1, is where the document shard we are looking for is located.

This explains why the number of master shards is determined at index creation time and never changes: if the number changes, then all the values of previous routes are invalid and the document is never found again.

4.6. Create a document

New, index, and delete requests are write operations that must be completed on the master shard before they can be copied to the relevant replica shard.

The following is the sequence of steps required to successfully create, index, and delete documents on primary and secondary shards and any replica shards:

  1. The client sends a new, index, or delete request to Node 1.
  2. The node uses the document _id to determine that the document belongs to Shard 0. The request is forwarded to Node 3 because the primary shard from Shard 0 is currently assigned to Node 3.
  3. Node 3 performs the request on the main shard. If successful, it forwards the request to the Node 1 and Node 2 replica shards in parallel. Once all replica shards report success, Node 3 reports success to the coordinating Node, which reports success to the client.

By the time the client receives a successful response, the document changes have been executed in the master shard and all replica shards, and the changes are safe.

4.7. Query Documents

Documents can be retrieved from the master shard or from any other replica shard.

The following is the sequence of steps to retrieve a document from a master or replica shard:

  1. The client sends a fetch request to Node 1.
  2. The node uses the document _id to determine that the document belongs to Shard 0. A replica shard of shard 0 exists on all three nodes. In this case, it forwards the request to Node 2.
  3. Node 2 returns the document to Node 1, which then returns the document to the client.

When processing read requests, the coordination node achieves load balancing by polling all replica shards on each request.

Documents that have been indexed may already exist on the master shard but have not been copied to the replica shard when the document is retrieved. In this case, the replica shard may report that the document does not exist, but the master shard may successfully return the document. Once the index request is successfully returned to the user, the document is available in both the master and replica shards.

4.8. Update documents

Here are some steps to update a document:

  1. The client sends an update request to Node 1.
  2. It forwards the request to Node 3 where the main shard is located.
  3. Node 3 retrieves the document from the master shard, modifies the JSON in the _source field, and attempts to re-index the document from the master shard. If the document has been modified by another process, it retries Step 3 and aborts after retry_on_conflict.
  4. If Node 3 successfully updates the document, it forwards the new version of the document in parallel to shards on Node 1 and Node 2 for re-indexing. Once all replica shards return success, Node 3 returns success to the coordinator, and the coordinator returns success to the client.

4.9. Distributed retrieval

A CRUD operation processes only a single document, whose uniqueness is determined by a combination of _index, _type, and routing values. This means that we know exactly which shard in the cluster contains this document.

Search requires a more complex execution model because we do not know which documents the query will hit: those documents could be on any shard in the cluster. A search request must ask a copy of all the shards in the Index or Indices we are interested in to determine if they contain any matching documents.

But finding all the matching documents is only half the job. Results in multiple shards must be combined into a single sorted list before the Search interface can return a page result. To do this, the search is performed as a two-phase process called Query then Fetch.

4.9.1. Query phase

During the initial query phase, the query is broadcast to each shard copy (master shard or replica shard) in the index. Each shard performs a search locally and builds a priority queue of matching documents.

The query phase consists of the following three steps:

  1. The client sends a search request to Node 3, which creates an empty priority queue of size from + size.
  2. Node 3 forwards query requests to each master or replica shard of the index. Each shard executes the query locally and adds the result to the local ordered priority queue of size from + size.
  3. Each shard returns the IDS and sorted values of all documents in its priority queue to the coordination Node, Node 3, which merges these values into its priority queue to produce a globally sorted list of results.

When a search request is sent to a node, that node becomes the coordination node. The task of this node is to broadcast query requests to all relevant shards and consolidate their responses into a globally sorted result set, which is returned to the client.

The coordination node merges these shard results into its own ordered priority queue, which represents the global ordering result set. This is the end of the query process.

4.9.2. Fetch phase

The query phase identifies which documents satisfy the search request, but we still need to fetch those documents, which is the task of the fetch phase.

The distributed phase consists of the following steps:

  1. The coordination node identifies which documents need to be retrieved and submits multiple GET requests to the related shard.
  2. Each shard loads and enriches the document, then returns the document to the coordination node if necessary.
  3. Once all documents have been retrieved, the coordination node returns the result to the client.

The coordination node first decides which documents actually need to be retrieved. For example, if our query specified {“from”: 90, “size”: 10}, the first 90 results would be discarded, and only the 10 results starting with the 91st would need to be retrieved. These documents may come from one, more, or all of the shards associated with the original search request.

Deep paging

Each shard must first create a FROM + size queue, and the coordination node needs to sort documents by number_of_shards * (from + size) to find documents included in size.

Depending on the size of your document, the number of shards, and the hardware you use, deep paging (1,000 to 5,000 pages) for 10,000 to 50,000 results is perfectly feasible. But with sufficiently large FROM values, the sorting process can become very heavy, using a lot of CPU, memory, and bandwidth.

5. Actual usage of ElasticSearch

5.1. Sharding setting

  • If the number of fragments is too small, data writing becomes a bottleneck and cannot be horizontally expanded
  • If the number of fragments is too large, each fragment is a Lucene index, and too many fragments occupy too many resources

How to calculate the number of fragments?

Note that it is best to set the number of fragments to an integer multiple of the number of nodes to ensure that the load on each host is almost the same. Especially, if multiple instances are deployed on a host, pay attention to this point. Otherwise, other hosts may have normal loads but one host may have a high load.

Generally we calculate shards based on the amount of data per day, and it is reasonable to keep the size of each shard below 50GB. If this is not enough, you may need to split more indexes at the index level or create indexes by the hour via alias +.

5.2. Data in near real time

After ES data is written, a REFRESH operation is performed before indexes can be created and queries can be performed. But GET queries are special, and the data is available in real time.

Before ES5.0, translog could provide real-time CRUD. Get queries would first check translog for the latest changes and then try to find ids in segments. After 5.0, translog real-time queries were removed in order to reduce the responsibility of the Translog design in order to optimize Translog for other, more important aspects.

5.3. Deep paging issues

Solution 1:

5.3.1. Scan and Scroll API server cache

In order to return a page of records, we throw away most of the other results that have been sorted. The simple thing is to cache the result so you can use it next time. In this vein, ES provides the Scroll API. It is somewhat like a traditional database cursor in concept.

The Scroll call essentially creates a snapshot in real time and holds the snapshot for a specified period of time so that the next request does not need to be reordered. In this sense, Scroll is a server cache. Since it is a cache, there are two problems:

  • Consistency issues. The ES snapshot is what it looks like at the time it was created, and it ignores any changes made before they expire.
  • Server cost. ES will reserve a Search context for each scroll operation. By default, ES merges smaller segments into larger ones to provide indexing speed, at which point the smaller segments will be dropped. However, if ES finds any index segments in use during scroll, they will not be merged. This means more file descriptors and slower indexing.

There is actually a third problem, but it is not a caching problem, but a result of the cursor mechanism used by ES. That is, you can only scan sequentially, not arbitrarily jump page. It also requires customers to bring a “cursor” with each request.

Solution 2:

5.3.2. The Search After mechanism

Scroll API is of course much better than from+size, but it also has the following problems:

  • Search Context is expensive.
  • Is a temporary snapshot, not a real-time pagination result.

In response to these problems, ES 5.0 introduced the Search After mechanism to provide more real-time live cursors. The idea is to use the paging results of the previous page to improve paging requests on the next page.

ElasticSearch 下 载 Java Engineer 下 载 Java Engineer 下 载 Java Engineer 下 载