Use Elasticsearch to consume kafka data. Kafka Broker version is unchanged, Elasticsearch version is unchanged, Elasticsearch client (Transport API) version is unchanged. Kafka client version has changed, because the apache kafka native API consumption model is a kafka consumer corresponding to multiple partitions; Spring Kafka refactoring is now used to ensure that a partition corresponds to at least one Kafka consumer. There are 9 topics with 27 partitions in total.

Elasticsearch throws the following exception

java.util.concurrent.ExecutionException: RemoteTransportException [[es03] [192.168.132.115:9300] [indices: data/write/update]]. Nested: RemoteTransportException [[es03] [192.168.132.115:9300] [indices: data/write/update [s]]]. nested: VersionConflictEngineException[[query][c3b8666b43b7cdab0450b0620b7f5e5df0633b35]: version conflict, document already exists (current version [1])];Copy the code
  1. Elasticsearch writes documents based on optimistic locks, so this must happen because of concurrent write issues
  2. Currently, there is normally one Kafka consumer per partition, so concurrent writes (i.e., double consumption) should not occur
  3. The second point above ignores an important premise in thegroupIdThis ensures that a partition is not consumed by multiple Kafka consumers

Conclusion: The Kafka client group ID is not the same as before

Tips:

Elasticsearch is distributed. When a document is created, updated, or deleted, the new version of the document must be copied to other nodes in the cluster. Elasticsearch is also asynchronous and concurrent, which means that these replication requests are sent in parallel and may not arrive at their destination in the same order. Elasticsearch needs to ensure that the old version of the document does not overwrite the new version.

To ensure that an older version of the document does not overwrite the new version, each operation performed on the document is assigned a sequence number by the master shard that coordinates the change. The serial number increases with each operation, so the new operation must have a higher serial number than the old one. Elasticsearch can then use the sequence number of the operation to ensure that the new document version is not overwritten by changes that assign a smaller sequence number.

Problem 2: Kafka throws the following exception

org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
Copy the code

Checked and tested

Previously consumed this defaults to RangeAssignor

partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor
Copy the code

Now optimized to

partition.assignment.strategy = org.apache.kafka.clients.consumer.RoundRobinAssignor
Copy the code

So kafka cannot properly allocate partitions

Elasticsearch Node type

Master Node
  • Duties and responsibilities
    1. Handles index creation, index deletion, etc. / determines which node the shard is assigned to/is responsible for index creation and index deletion.
    2. Maintain and update the Cluster State. Only the master node can maintain the Cluster State. Otherwise, the Cluster status may be abnormal.
  • Best practices for Master Nodes
    1. The Master node is very important and needs to be considered in the deployment to solve the problem of single points.
    2. Set up multiple Master nodes for a cluster/each node assumes the single role of Master.
Data Node
  • Responsibility: Save shard data. It plays a crucial role in data scaling (the Master Node decides how to distribute shards to the data nodes).
  • When a node is started, it is a data node by default. You can set thenode.data: falseIs prohibited.
  • By adding data nodes, horizontal data scaling and single data point problems can be solved.
Master Eligible Nodes & Select main process
  • You can configure multiple Master Eligible nodes in a cluster. These nodes can participate in the main selection process and become Master nodes when necessary (for example, when the Master node fails or the network fails).
  • When the node starts, the default is a Master eligible node, setnode.master: falseIs prohibited.
  • When the first Master eligible node in the cluster starts, it elects itself as a Master node.
Coordinating Node
  • The node that processes the request is responsible for routing the request to the correct node, for example, the request to create an index needs to be routed to the Master node.
  • All nodes are Coordinating nodes by default.
  • Make the other types (Data Node/Master Node/Master eligible Node) exclusively responsible for coordination by setting them to False.
Summary of Node Types
The node type Configuration parameters The default value
master eligible node.master true
data node.data true
ingest node.ingest true
coordinating only There is no Set all three parameters to false
machine learning node.ml True (need to enable x-pack)

Elasticsearch architecture

Elasticsearch changes document version when it is created, updated, or even deleted.

How to make Elasticsearch highly available:
  1. Data is first written toIndex buffer(memory) andTransaction logIn which data can be read from disk even if it is lost in memoryTransaction log.
  2. The default is 1srefreshOperation willIndex bufferData write insegments(memory) at this timeData searchable.
  3. The default value is 30 minutesflushOperation,segmentsWrite to disk and empty simultaneouslyTransaction log; ifTransaction logThis operation is also performed when the vm is full (512 MB by default).
  4. mergeOperation, regular mergesegment

Each index operation in Elasticsearch first resolves to a copy group using a route, usually based on the document ID. Once a copy group is identified, the action is internally forwarded to the current master shard of the group. The master shard is responsible for validating the data format and forwarding it to other copies. Because replicas can be replicated asynchronously by the master shard, there is no need for the master copy to be replicated to all replicas. Instead, Elasticsearch maintains a shard list of copies that should receive operations. This list is called a synchronous copy and is maintained by the master node. As the name implies, these are a set of shard copies that are guaranteed to handle all index and delete operations that have been confirmed to the user. The master shard is responsible for maintenance, so all operations must be copied to each replica shard in the collection.

Master sharding follows the following basic flow:

  • Validate the incoming operation and reject it if the structure is invalid (for example: insert field format does not match mapping)
  • Perform operations locally, that is, index or delete related documents. This will also validate the contents of the field and reject it if needed (for example: keyword values are too long to be indexed in Lucene).
  • Forward the action to each replica shard in the current synchronous replication set. If there are multiple replica fragments, they are executed in parallel.
  • Once all replica shards have successfully performed the operation and responded to the master shard, the master replica confirms to the client that the request was successfully completed.
Lucene Index
  • In Lucene, a single inverted index file is called a Segment. Segments are self-contained and unchangeable, with multiple Segments grouped together called Lucene’s Index, corresponding to the Shard in ES. Then use a separate COMMIT file to record all segments in the index.
  • New Segments are generated when a new document is written, and all Segments are queried simultaneously and the results are summarized. Lucene has a file that records all Segments information called Commit Point.
  • The deleted document information is saved in the. Del file.
What is a Refresh
  • The data is first written to the Index buffer and cannot be queried.

  • The process of writing an Index buffer to a Segment is called Refresh. Refresh does not performfsyncAction that does not write data to disk.

  • Refresh frequency: The Refresh frequency occurs once every second by defaultindex.refresh_intervalConfiguration. After Refresh, the data can be searched. Is that why Elasticsearch is calledNear real time search.
  • If the system has a large number of data writes, it will trigger Refresh when the SegmentIndex Buffer is full. The default value is 10% of the JVM.
What is a Transaction log

  • The process of writing a Segment to the disk is time-consuming. With the help of the file system cache and Refresh, the Segment is written to the cache first to open the query.
  • To ensure that data is not lost. So in the Index document, Transaction Log is written at the same time. From the higher version, Transaction Log is dropped by default. Each shard has a Transaction Log.
  • In ES Refresh, the Index Buffer is cleared and the Transaction log is not.
What is a Flush

ES Flush & Lucene Commit

  • Actual impact: Refresh is called, Index Buffer is cleared, Segments in the cache is written to disk, and Transaction Log is cleared (deleted)
  • Call timing: every 30 minutes by default, or Transaction Log is full (512 MB by default)
What is the Merge

  • Segments are numerous and need to be merged periodically
    • Reduce Segments/delete deleted documents
  • ES and Lucene automatically Merge
    • POST my_index/_forcemerge

Elasticsearch reading process

Elasticsearch uses the master/slave model. One advantage of the master backup model is that the master shard holds the same data as all its replica shards. Therefore, a synchronous copy is sufficient to satisfy the read request.

Fetching from Elasticsearch can be a straightforward document ID, or it can be a very complex search with complex aggregations, which can be CPU intensive.

When a node receives a read request, it is responsible for forwarding it to the node containing the related shards, integrating the return values of all the shards, and responding to the client (similar to a MapReduce). We refer to this node as the coordination node of the request. The basic process is as follows:

  • Parse read requests to related shards. Note that since most searches will be sent to one or more indexes, they typically need to be read from multiple shards, each representing a different subset of the data.
  • Select the active copy of each related shard from the shard replication group. This can be a master shard or a copy shard. By default, Elasticsearch simply loops between replica shards.
  • Sends a shard level read request to the selected copy.
  • Integrate results and respond. Note that in the case of get by ID lookups, only one shard is relevant and you can skip this step.

thinking

  1. At least two people must verify all configurations before going online
  2. It is important to fully understand the basic architecture and details of the middleware you are using, as the optimistic locking form of Elasticsearch is a bit of an afterthought when writing documents

The resources

  1. www.elastic.co/guide/en/el…
  2. Elasticsearch in Action
  3. Wangnan. Tech/post/elksta…
  4. www.elastic.co/guide/en/el…

This article is written and licensed BY CC BY 3.0CN. Can be reproduced freely, quote, but the author must be signed and indicate where the article originated. If reprinted to wechat official account, please add the author’s official qr code at the end of the article.