preface

Elasticsearch is a real-time distributed search analytics engine that allows you to explore your data at a speed and scale never seen before. It is used for full-text search, structured search, analysis, and a combination of these three functions. Elasticsearch can scale horizontally to hundreds (or even thousands) of server nodes while handling petabytes of data.

Elasticsearch is distributed, but we don’t have much to do with it. We just need to start up a number of ES instances and assign them the same cluster.name to the same cluster. When creating an index, you only need to specify the number of index fragments and copies, and the rest is left to ES internal implementation.

Solr requires routing rules and data synchronization policies. Solr also relies on ZooKeeper for distribution, but Elasticsearch does not allow this.

So let’s say Elasticsearch is naturally distributed and designed to shield the complexity of distribution. Elasticsearch is almost transparent in terms of distribution. We can easily run Elasticsearch on a single node on a laptop, but if you want to run it on a cluster of 100 nodes, everything works fine.

Elasticsearch hides the complexity of distributed systems as much as possible. Here are some examples of operations that are performed automatically in the background:

  • Assign documents to different containers or shards, and documents can be stored in one or more nodes.
  • These shards are evenly distributed by cluster nodes to load balance the indexing and search process.
  • Each shard is replicated to support data redundancy and prevent data loss due to hardware failures.
  • Route requests from any node in the cluster to the node that holds the relevant data.
  • During cluster expansion, new nodes are seamlessly integrated and shards are reassigned to recover from outliers.

It is possible to use Elasticsearch without knowing the distributed internal implementation mechanism, but it will give you a more complete understanding of Elasticsearch from another perspective. Elasticsearch is a distributed implementation of Elasticsearch.

Principle of Cluster

From our previous distributed experience, we know that increased distributed performance can be achieved by purchasing more powerful servers (vertically or vertically scaled) or more servers (horizontally or horizontally scaled).

While Elasticsearch can benefit from more powerful hardware, such as SSD storage, vertical expansion is limited due to technical and price limitations of hardware. True capacity expansion comes from horizontal expansion — adding more nodes to the cluster and spreading the load and stability among them.

For most databases, significant application changes are often required to take advantage of the additional resources that will be added horizontally. By contrast, ElastiSearch is distributed by nature and knows how to manage multiple nodes to improve scalability and availability. This also means your app doesn’t have to pay attention to this problem. So how is it managed?

The master node

Starting an ES instance is a node, and nodes are added to the cluster by setting the same cluste.name in the configuration file. Therefore, a cluster is composed of one or more nodes with the same cluster.name configuration, which share the burden of data and load. When a node is added to the cluster or removed from the cluster, the cluster redistributes all data evenly.

As in the master-slave mode of other component clusters (mysql, Redis), an ES cluster elects a node as the master node. The master node is responsible for maintaining the status of the global cluster and redistributing shards when nodes join or leave the cluster. For details on primary node elections, see Primary Node elections.

All major document-level apis (index, delete, search) do not communicate with the master node, which does not need to be involved in document-level changes, searches, etc., so it does not become a bottleneck even if traffic increases when the cluster has only one master node. Any node can be the primary node. If there is only one node in the cluster, it is also the primary node.

So if we were using Kibana as a view manipulation tool, we would just add elasticSearch.url to the kibana.yml configuration file: “http://localhost:9200” is used as the primary node. The primary node ES will automatically query information about all nodes, shards and replicas. So Kibana is usually on the same server as the primary node.

As users, we can send requests to any node in the cluster, including the primary node. Each node knows where any document is and is able to forward our requests directly to the node that stores the document we need. No matter which node we send the request to, it takes care of collecting data from each node that contains the documents we need and returns the final result to the client. Elasticsearch manages all of this transparently.

Discovery mechanism

How does ES add nodes to the same cluster just by configuring the same cluste.name? The answer is the Discovery Module.

The discovery mechanism is responsible for discovering nodes in the cluster and selecting the primary node. Each time the cluster state changes, the other nodes in the cluster know about the state (depending on which discovery mechanism is used).

At present, ES mainly recommends the following automatic discovery mechanisms:

  1. Azure Classic Discovery plug-in mode, multicast
  2. EC2 Discovery plug-in mode, multicast
  3. Google Compute Engine (GCE) Discovery plug-in, multicast
  4. Zen Discovery default implementation, multicast/unicast

The definitions and differences of unicast, multicast and broadcast are introduced here to help us better understand the discovery mechanism.

Unicast, multicast, broadcast differences:

  • Unicast: Communication between network nodes is like a conversation between people. If one person talks to another, the term used in network technology is “unicast”, in which information is received and transmitted only between two nodes. For example, when you send and receive emails or browse the Web, you must establish a connection with the mail server or Web server. In this case, unicast data transmission is used.

  • Multicast: Also known as “multicast,” multicast means that one person speaks to more than one person (but not all of them) in order to improve the efficiency of calls. Because if unicast mode is adopted and transmission is carried out node by node, there will be as many transmission processes as there are target nodes. This mode is obviously very inefficient and undesirable. If you want to inform certain people of the same thing, but do not want others to know, using the phone to inform one by one is very troublesome. In multicast mode, the data of all target nodes can be transmitted at one time, and the data of specific objects can be transmitted only. The application of multicast in network technology is not very much, online video conference, online video on demand is particularly suitable for the use of multicast.

  • Broadcast: It can be understood that one person speaks to the whole audience through the loudspeaker. The advantage of this method is that the conversation efficiency is high, and the information can be transmitted to the whole audience at one time. Broadcast is a way that all the data can be transmitted at one time without distinguishing the specific data receiver.

Of the Discovery mechanisms listed above, Zen Discovery is the ES default built-in Discovery mechanism. It provides unicast and multicast discovery methods, and can be extended to support cloud and other forms of discovery through plug-ins. So let’s focus on how Zen Discovery is used in Elasticsearch.

A cluster consists of nodes with the same cluster.name. When you start a second node on the same machine, as long as it has the same cluster.name configuration as the first node, it will automatically discover the cluster and join it. However, when starting nodes on different machines, you need to configure a list of connected unicast hosts in order to join the same cluster.

Unicast host list by discovery. Zen. Ping. Unicast. Hosts to configuration. This configuration is in the elasticSearch.yml file:

discovery.zen.ping.unicast.hosts: ["host1"."host2:port"]
Copy the code

The value is either an array of hosts or a comma-separated string. Each value should be used host: port or in the form of the host (including the port is set by default transport. Profiles. The default. The port, if not set the return transport. TCP. The port). Note that the IPv6 host must be enclosed in parentheses. The default value for this setting is 127.0.0.1, [:: 1].

Elasticsearch officially recommends using unicast instead of multicast. And Elasticsearch is configured to use unicast discovery by default to prevent nodes from unintentionally joining the cluster. Only nodes running on the same machine automatically form a cluster.

Although multicast is still provided as a plug-in, it should never be used in production, otherwise you end up with a node accidentally added to your production because they received an incorrect multicast signal. There’s nothing wrong with multicast per se, it can cause stupid problems and make clusters vulnerable (for example, if a network engineer is tinkering with the network and doesn’t tell you, you’ll suddenly find that all the nodes can’t find each other).

With unicast, you can provide Elasticsearch with a list of nodes it should try to connect to. When a node contacts a member of the unicast list, it gets the status of all nodes in the entire cluster, and then it contacts the master node and joins the cluster.

This means that your unicast list doesn’t need to contain all the nodes in your cluster, it just needs enough nodes when a new node contacts one and talks to it. If you use master candidate nodes as a unicast list, you only need to list three.

For more information about Elasticsearch node Discovery, see Zen Discovery.

Deal with failure

Familiar with distributed systems, we should know that distributed systems are designed to improve availability and fault tolerance. Problems in a single point system also exist in ES.

Single node problem

If we start a single node that does not contain any data or indexes, then our cluster is a cluster with empty content nodes, called empty cluster.

When only one node is running in a cluster, that means there is a single point of failure — no redundancy. The biggest problem of a single point is that the fault tolerance of the system is not high. When the server where a single node is located fails, the whole ES service will stop working.

Let’s create an index named User in a cluster that contains an empty node. Indexes are assigned 5 master shards and 1 copy of each master shard by default, but for demonstration purposes we will assign 3 master shards and 1 copy (each master shard has one copy shard) :

PUT /user
{
   "settings" : {
      "number_of_shards" : 3,
      "number_of_replicas" : 1
   }
}
Copy the code

Our cluster now looks like the one below, with all three primary shards assigned to Node 1.

Check the cluster health status. GET /_cluster/health:

{
  "cluster_name": "elasticsearch",
  "status": "yellow",                     # 1
  "timed_out": false,
  "number_of_nodes": 1,
  "number_of_data_nodes": 1,
  "active_primary_shards": 3,
  "active_shards": 3,
  "relocating_shards": 0,
  "initializing_shards": 0,
  "unassigned_shards": 3,                 # 2
  "delayed_unassigned_shards": 0,
  "number_of_pending_tasks": 0,
  "number_of_in_flight_fetch": 0,
  "task_max_waiting_in_queue_millis": 0,
  "active_shards_percent_as_number": 50
}
Copy the code

The cluster status is yellow. The number of unallocated copies is 3

If the cluster health status is YELLOW, all master shards are running properly (the cluster can service all requests properly), but not all replica shards are in the normal state. In fact, all three replica shards are unassigned — none of them is assigned to any node. It doesn’t make sense to keep both original data and copies on the same node, because once we lose that node, we lose all copies on that node as well.

Master shards and corresponding replica shards are never on the same node. Therefore, the maximum number of replica fragments is n-1 (where n is the number of nodes).

Although our cluster is currently up and running, there is a risk of data loss in the event of a hardware failure.

Levels increase

Since a single point is problematic, we only need to start a few more nodes and join the current cluster to improve availability and failover, which is called horizontal scaling.

Using the user example above, after we add a node, the new cluster will look like the figure above.

When the second node is added to the cluster, three replica shards are assigned to that node — one for each master shard. This means that when any node in the cluster fails, our data is intact.

All newly indexed documents are stored on the master shard and then copied in parallel to the corresponding replica shard. This ensures that documents can be retrieved from both master and replica shards.

Cluster-health now displays a green status, which means all six shards (three master shards and three replica shards) are running properly. Our cluster is now not only up and running, but also in an always-available state.

Dynamic capacity

Products rising, growing business, users will also continue to add, maybe before we design the index of capacity (3 main fragmentation and 3 copies fragmentation) has not enough use, an increasing number of user data, copy of each main fragmentation and fragmentation data accumulating, after reach a certain level and reduces the search performance. So how do we scale up our growing applications on demand?

We continue to expand the previous two nodes horizontally and add another node. At this time, the cluster status is as shown in the figure below:

To spread the load, ES redistributes the shards. One shard on Node 1 and one shard on Node 2 has been migrated to the new Node 3, so there are now two shards on each Node instead of three. This means that the hardware resources (CPU, RAM, I/O) of each node will be shared by fewer shards and the performance of each shard will be improved.

Sharding is a full-featured search engine that has the ability to use all resources on a node. Our index with six shards (3 master shards and 3 replica shards) can be expanded to a maximum of six nodes, with one shard on each node, and each shard has all the resources of its node.

But what if we want to expand by more than 6 nodes?

The number of master shards is determined when the index is created. In effect, this number defines the maximum amount of data that the index can store. (The actual size depends on your data, hardware, and usage scenario.) However, read operations — searching and returning data — can be handled by both master and replica shards, so the more replica shards you have, the higher throughput you will have.

** The number of primary shards in an index cannot be changed after the index is created (default is 5), but the number of copies per primary shard (default is 1) can be changed at any time for active index libraries. ** As for why the index master shard number cannot be changed after the index is created, we explain in the document Storage Principles section below.

Since the number of replica shards can be dynamically adjusted on a running cluster, we can scale the cluster on demand. Let’s increase the number of copies from the default of 1 to 2:

PUT /user/_settings
{
   "number_of_replicas" : 2
}
Copy the code

As shown in the figure below, the User index now has nine shards: three master shards and six replica shards. This means we can scale the cluster up to nine nodes, with one shard on each node. Compared with the original three nodes, the cluster search performance can be improved by three times.

Of course, simply adding more replica shards to a cluster with the same number of nodes will not improve performance because each shard will get fewer resources from the nodes. You need to add more hardware resources to improve throughput.

But the larger number of replica fragments increases data redundancy: with the node configuration above, we can lose two nodes without losing any data.

Node failure

If one of our nodes fails, the node server breaks down or the network is unavailable, assume that the primary node 1 fails, then the cluster state is:

At this point we check the health of the cluster and see that the state is RED, indicating that not all master shards are working properly.

The node we shut down is a primary node. The cluster must have a master Node to work, so the first thing that happens is to elect a new master Node: Node 2.

When we shut down Node 1 we lost master shards 1 and 2, and the index didn’t work when we lost the master shard.

Fortunately, full copies of these two master shards exist on other nodes, so the new master Node immediately promotes the corresponding replica shards on Node 2 and Node 3 to master, and the cluster’s state will be yellow. The process of promoting the master shard is instantaneous, like flipping a switch.

Why is our cluster state yellow instead of green? Although we have all three master shards, we also set up 2 replica shards for each master shard, and there is only one replica shard. So the cluster can’t be green, but don’t worry too much: if we also shut down Node 2, our program can still run without losing any data, because Node 3 keeps a copy of each shard.

If we restart Node 1, the cluster can fragment the missing copies and re-allocate them, and the cluster will return to its normal state. If Node 1 still has the previous shards, it will try to reuse them and only copy the modified data files from the master shard.

Handling concurrency conflicts

One of the most troublesome aspects of distributed systems is concurrency conflict. Since ES is also distributed, how does it handle concurrency conflict?

Usually when we update a document using an indexing API, we can read the original document once, make our changes, and then re-index the entire document. The most recent index request wins: whichever document was last indexed will be stored uniquely in Elasticsearch. If other people change the document at the same time, their changes will be lost.

A lot of times that’s fine. Maybe our primary datastore is a relational database and we just copy the data into Elasticsearch and make it searchable. It may be rare for two people to change the same document at the same time. Or maybe it’s not a big deal for our business to occasionally lose changes.

But sometimes missing a change can be very serious. Imagine using Elasticsearch to store the number of items in stock in our online store. Every time we sell an item, we reduce the number of items in stock in Elasticsearch.

One day, the management decided to do a promotion. Suddenly, we were selling several items a second. If you have two Web programs running in parallel, each handling all sales at the same time, you can have inconsistent inventory results.

The more frequent the changes, the longer the gap between reading and updating data, and the more likely you are to lose the changes.

Optimistic concurrency control – version number

In the database world, two methods are commonly used to ensure that changes are not lost in concurrent updates:

  • Pessimistic locking is an approach widely used by relational databases that blocks access to resources to prevent conflicts on the assumption that change conflicts are likely to occur. A typical example is to lock a row of data before reading it, ensuring that only the thread that placed the lock can modify the row.

  • This approach used in optimistic lock Elasticsearch assumes that conflicts are impossible and will not block the operation being attempted. However, if the source data is modified during reads and writes, the update will fail. The application then decides how to resolve the conflict. For example, you can retry updates, use new data, or report back to the user.

For Elasticsearch’s index, GET, and DELETE requests for documents, we indicate that each document has a _version number that increases when the document is modified.

Elasticsearch uses this _version number to ensure that changes are executed in the correct order. If an old version of the document arrives after the new version, it can simply be ignored.

You can use the _version number to ensure that conflicting changes in your application do not result in data loss. We do this by specifying the version number of the document we want to change. If this version is not the current version number, our request will fail.

All document update or delete apis can accept the version parameter, which allows you to use optimistic concurrency control in your code, which is a wise approach.

Optimistic concurrency control – external systems

Version is just one way to do this. You can also use external systems for version control. A common setup is to use another database as the primary data store and use Elasticsearch for data retrieval. This means that all changes to the master database need to be copied to Elasticsearch when they occur, and if multiple processes are responsible for this data synchronization, you may encounter concurrency issues similar to those described earlier.

If your primary database already has a version number, or a field value that can be used as a version number, such as TIMESTAMP, then you can reuse the same version number in Elasticsearch by adding version_type=external to the query string. The version number must be an integer greater than zero and less than 9.2E+18(a positive value of type long in Java).

The external version number is handled a little differently from the internal version number we discussed earlier. Instead of checking if the current _version is the same as the version specified in the request, Elasticsearch checks if the current _version is less than the specified version number. If the request is successful, the external version number is stored as the new _version of the document.

The external version number can be specified not only during indexing and delete requests, but also when a new document is created.

For example, to create a new blog post with external version number 5, we could do this:

PUT /website/blog/2? version=5&version_type=external { "title": "My first external blog entry", "text": "Starting to get the hang of this..." }Copy the code

In the response, we can see that the current _version number is 5:

{
  "_index":   "website"."_type":    "blog"."_id":      "2"."_version": 5."created":  true
}
Copy the code

Now we update the document and specify a new version number of 10:

PUT /website/blog/2? version=10&version_type=external { "title": "My first external blog entry", "text": "This is a piece of cake..." }Copy the code

Request successful and set current _version to 10:

{
  "_index":   "website"."_type":    "blog"."_id":      "2"."_version": 10."created":  false
}
Copy the code

If you try to re-run this request, it will fail and return the same collision error as we saw earlier, because the specified external version is no greater than the current version of Elasticsearch.

Principles of Document Storage

When creating the index, we only need to specify the number of shards and copies, and ES will automatically distribute the document data to the corresponding shards and copies. So how exactly are files distributed to and retrieved from the cluster? Elasticsearch hides these low-level details so we can focus on business development, but we delve into the core technical details to help you better understand how data is stored in this distributed system.

How are documents routed into shards

When a document is indexed, it is stored in a master shard. How does Elasticsearch know which shard to store a document in? When we create a document, how does it decide whether the document should be stored in shard 1 or shard 2?

First of all, it certainly can’t be random, otherwise we won’t know where to look when we need to retrieve documents in the future. In fact, the process is determined by the following formula:

shard = hash(routing) % number_of_primary_shards
Copy the code

Routing is a variable value, which defaults to the _id of the document and can be set to a custom value. Routing uses the hash function to generate a number, which is then divided by number_of_primary_shards (number of primary shards) to get the remainder. The remainder, between 0 and number_of_primary_shreds -1, is where the document shard we are looking for is located.

This explains why the number of master shards is determined at index creation time and never changes: if the number changes, then all the values of previous routes are invalid and the document is never found again.

You might think that since the number of Elasticsearch master shards is fixed, it would be very important to pre-allocate the number of shards when creating an index.

All document apis (GET, index, DELETE, BULK, Update, and MGET) accept a routing parameter called Routing, which allows you to customize the mapping of documents to shards. A custom routing parameter can be used to ensure that all related documents — for example, all documents belonging to the same user — are stored in the same shard. More information about routing can be found here.

How do master and replica shards interact

How does a master shard interact with a replica shard?

Suppose you have a cluster of three nodes with an index named User that has two master shards, each of which has two replica shards. Copies of the same shard are not placed on the same node, so our cluster looks like the following:

We can send requests to any node in the cluster. Each node is capable of handling arbitrary requests. Each node knows any document location in the cluster, so requests can be forwarded directly to the desired node. In the following example, all requests are sent to Node 1, which we call a Coordinating Node.

When sending a request, it is better to poll all nodes in the cluster to extend the load.

New, index, and delete requests to documents are write operations that must be completed on the master shard before they can be copied to the relevant replica shard.

The following is the sequence of steps required to successfully create, index, and delete documents on primary and secondary shards and any replica shards:

  1. The client sends a new, index, or delete request to Node 1.
  2. The node uses the document _id to determine that the document belongs to Shard 0. The request is forwarded to Node 3 because the primary shard from Shard 0 is currently assigned to Node 3.
  3. Node 3 performs the request on the main shard. If successful, it forwards the request in parallel to the replica shards for Node1 and Node2. Once all replica shards report success, Node 3 reports success to the coordinating Node, which reports success to the client.

By the time the client receives a successful response, the document changes have been executed in the master shard and all replica shards, and the changes are safe.

When processing read requests, the coordination node achieves load balancing by polling all replica shards on each request.

Documents that have been indexed may already exist on the master shard but have not been copied to the replica shard when the document is retrieved. In this case, the replica shard may report that the document does not exist, but the master shard may successfully return the document. Once the index request is successfully returned to the user, the document is available in both the master and replica shards.


Personal public account: JaJian

Welcome long press the picture below to pay attention to the public number: JaJian!

We regularly provide you with the explanation and analysis of distributed, micro-services and other first-line Internet companies.