1. Resolve concurrency conflicts

In the e-mart landscape, the workflow is as follows:

  1. Read commodity information, including inventory quantity
  2. User order purchase
  3. Update the product information and reduce the inventory by one

If the operation is multi-threaded, there may be multiple threads to execute the above 3-step process concurrently. If there are two people reading commodity data at this time, the two threads concurrently serve the two people and modify the commodity inventory data at the same time. Correct: Thread A sets inventory -1 to 99, thread B then reads 99, then -1 to 98. If thread A and thread B both read 100 items, then change it to 99 items after A is finished, and change it to 99 items after B is finished, then the result is wrong.

2. Solutions

2.1 pessimistic locks

When reading commodity data, lock this line of data at the same time, when this thread finished processing data, and then unlock, another thread started processing.

Pessimistic lock concurrency control scheme, is in all cases, lock. After the lock, only one thread can manipulate the data. Different scenarios have different locks, such as row-level lock, table-level lock, read-level lock, and write-level lock.

2.2 optimistic locking

Optimistic locks are unlocked, and each thread can operate arbitrarily. There is A version field in each document of ES, which is set to 1 after the new document is created and the accumulation is modified once. Thread A and THREAD B read data at the same time, version=1, and the inventory of THREAD A is 99 after the processing is finished. When writing es, it will be compared with the version number in ES. B is 99 after processing, and the version number of the data stored in ES is obviously different from that in ES, version=2. At this time, 99 will not be used to update, but the latest data will be read again, and then minus one to become 98. Perform the above operation and write.

2.3 Optimistic locks for Elasticsearch

The backend of Elasticsearch is multi-threaded and asynchronous. Multiple requests are out of order.

Elasticsearch allows for optimistic lock concurrency control based on its _version number.

If the _version is not the same as the _version, it is discarded. The result will then be saved to a correct state.

Code examples:

PUT /test_index/test_type/3 {"test_field": "test test"} result: {"_index": "test_index", "_type": "test_type", "_id": "3", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": True} modify PUT /test_index/test_type/3 {"test_field": "test1 test1"} "test_type", "_id": "3", "_version": 2, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0}, "created": true} DELETE DELETE /test_index/test_type/3 result: {"_index": "test_index", "_type": "test_type", "_id": "3", "_version": 3, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": True} re-create PUT /test_index/test_type/3 {"test_field": "test1 test1"} result {"_index": "test_index", "_type": "test_type", "_id": "4", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true }Copy the code

The delete operation also increments the version number of the data

After deleting a document, you can prove from one side that it is not physically deleted immediately, because some of its version numbers and other information remain. Deleting a document and then recreating the document will actually add 1 to the delete version number

2.4 Es optimistic lock concurrency control example

  • Let’s create a new piece of data

    PUT /test_index/test_type/4
    {
      "test_field": "test"
    }
    Copy the code
  • Simulate two clients, both get the same data

    GET/test_index/test_type / 4 returns {" _index ":" test_index ", "_type" : "test_type", "_id" : "4", "_version" : 1, "found" : true, "_source": { "test_field": "test" } }Copy the code
  • One of the clients updates this data first, along with the version number of the data, to make sure that the version number of the data in ES is the same as the version number of the data in the client

    PUT test_index/test_type/4? Version =1 {"test_field": "client1 changed"} return result {"_index": "test_index", "_type": "test_type", "_id": "4", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": false }Copy the code
  • Another client, try to modify based on version=1 data, also with version number, optimistic lock concurrency control

    PUT test_index/test_type/4? Version =1 {"test_field": "client2 changed"} returns {"error": {"root_cause": [{"type": "version_conflict_engine_exception", "reason": "[test_type][4]: version conflict, current version [2] is different than the one provided [1]", "index_uuid": "rsiZYqiwSCC2XdR8N2bJow", "shard": "2", "index": "test_index" } ], "type": "version_conflict_engine_exception", "reason": "[test_type][4]: version conflict, current version [2] is different than the one provided [1]", "index_uuid": "rsiZYqiwSCC2XdR8N2bJow", "shard": "2", "index": "test_index" }, "status": 409 }Copy the code

    Optimistic locking successfully prevents concurrency problems

  • After optimistic locking successfully prevents concurrency problems, try to complete the update correctly

    Redo the GET request to GET version

    GET /test_index/test_type/4
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "4",
      "_version": 2,
      "found": true,
      "_source": {
        "test_field": "client1 changed"
      }
    }
    Copy the code

    Modify it based on the latest data and version number. After modification, bring the latest version number. This step may need to be repeated several times to succeed, especially in the case of multiple threads updating the same data frequently

    PUT /test_index/test_type/4? Version =2 {"test_field": "client2 changed"} returns {"_index": "test_index", "_type": "test_type", "_id": "4", "_version": 3, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": false }Copy the code

2.5 Optimistic lock concurrency control based on external Version

Es provides a feature, that is, you can do concurrency control based on a version number you maintain instead of an internal _version number.

? version=1&version_type=externalCopy the code

Version_type =external. The only difference is that _version can be changed only if the version provided by you is the same as _version in ES. When version_type=external, the change can only be completed if the version you provide is larger than _version in es

Es, _version=1,? Es, _version=1,? Version >1&version_type=external, for example? version=2&version_type=external

Code examples:

  • Start by creating a piece of data

    PUT test_index/test_type/5 {"test_field": "external test"} returns {"_index": "test_index", "_type": "test_type", "_id": "5", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true }Copy the code
  • Simulate two clients to query this data at the same time

    GET/test_index test_type / 5 returns {" _index ":" test_index ", "_type" : "test_type", "_id" : "5", "_version" : 1, "found" : true, "_source": { "test_field": "external test" } }Copy the code
  • The first client makes the change first, when the client program retrieves the latest version number of the data in its own database, say 2

    PUT /test_index/test_type/5? Version =2&version_type=external {"test_field": "external client1 changed"} returns {"_index": "test_index", "_type": "test_type", "_id": "5", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": false }Copy the code
  • Simulate the second client, and get the version number maintained in my database, also 2, and initiate a change based on version=2

    PUT /test_index/test_type/5? Version =2&version_type=external {"test_field": "external client2 changed"} returns {"error": {"root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[test_type][5]: version conflict, current version [2] is higher or equal to the one provided [2]", "index_uuid": "rsiZYqiwSCC2XdR8N2bJow", "shard": "1", "index": "test_index" } ], "type": "version_conflict_engine_exception", "reason": "[test_type][5]: version conflict, current version [2] is higher or equal to the one provided [2]", "index_uuid": "rsiZYqiwSCC2XdR8N2bJow", "shard": "1", "index": "test_index" }, "status": 409 }Copy the code
  • After the concurrency control succeeds, the update is re-initiated based on the latest version number

    GET/test_index test_type / 5 returns {" _index ":" test_index ", "_type" : "test_type", "_id" : "5", "_version" : 2, "found" : true, "_source": { "test_field": "external client1 changed" } } PUT /test_index/test_type/5? Version =3&version_type=external {"test_field": "external client2 changed"} returns {"_index": "test_index", "_type": "test_type", "_id": "5", "_version": 3, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": false }Copy the code