Cache and database dual-write consistency guarantee solution

1. Business background

  • Real-time solution with high requirements

    Cache database double-write consistency is usually used in scenarios where data real-time requirements are high, such as commodity inventory services.

    Solution:

    1. In scenarios where read and write concurrency is not high, CacheAsidePattern is generally used. That is, delete the cache first and then write to the database.

    2. In the scenario with high read/write concurrency.

      In high read/write concurrency scenarios, read and write operations are concurrent. Let’s say we have an inventory of 100 in the database and 100 in the cache. There is a write request to change the inventory to 99. It is normal to delete the cache first and then change the data in the database to 99. When the read request comes in and the cache is 0, the database will be queried to get 99. Then change the cache to 99 as well. However, if the data in the database has not been changed to 99 before the write request, then the data in the database has been changed to 99 after the write request, and the data in the database has been changed to 100. This leads to data inconsistencies

      Solution:

      Serialize read/write requests. All read and write requests are queued to ensure serial execution. A thread is then attached to each queue to perform the requested operation on the queue

  • Solutions with low real-time requirements (first understand)

For data with low real-time requirement, asynchronous data update can be adopted. For example, the data requirement of the commodity details page is not very real-time, but it requires large traffic, especially the hot data with high read concurrency. At this time, there must be a cache data production service. For example, a service that updates a product updates the details page data in the database without having to react to the page in real time. At this point, the request to modify the data can be placed in a message queue, and the cache data production service listens to the message service and needs to update the data in its cache once it receives the message.

2. Organize your thoughts

What if the read request and the write request are guaranteed to be for the same item? We need to do a HASH route to ensure that requests for the same item go to the same memory queue.

Each queue corresponds to a worker thread, which takes the corresponding request and performs the corresponding operation.

3. Implementation of the plan

  • 1. The thread pool and memory queue are initialized
  • 2. Encapsulation of two request objects
  • 3. Request Service encapsulation to be performed asynchronously
  • 4. Engineering thread encapsulation of request processing
  • 5. Two Controller interfaces are encapsulated
  • 6. Read request de-optimization
  • 7. Empty data request filtering

3.1 Thread pool + memory queue initialization

When the Web container is initialized, the thread pool and memory queue need to be initialized. We can customize a listener and register the listener.

  • A new listener

  • To register the listener

  • test

Now that the container initialization process is complete, we need to implement how to initialize thread pools and memory queues.

  • New thread pool and memory queue wrapper class ThreadPoolAndQueueWrapper

    This wrapper class is used in listeners and its init() method is called to perform thread pool initialization and queue initialization. The thread starts to submit the request work.

    package com.roncoo.eshop.inventory.thread;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.roncoo.eshop.inventory.request.Request;
    import com.roncoo.eshop.inventory.request.RequestQueue;
    
    /** * The thread pool and memory queue are initialized when the container is initialized@author Administrator
     *
     */
    public class ThreadPoolAndQueueWrapper {
    
    	private ExecutorService threadPool = Executors.newFixedThreadPool(10);
    	
    	public ThreadPoolAndQueueWrapper(a) {
    		RequestQueue requestQueue = RequestQueue.getInstance();
    		// The memory queue collection is filled up when initialized
    		for (int i = 0; i < 10; i++) {
    			ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(100);
    			requestQueue.addQueue(queue);
    			// The thread pool is used to submit the worker thread for request processing
    			threadPool.submit(newRequestProcessThread(queue)); }}/** * initializes the worker thread pool and memory queue
    	public static void init(a) {
    		// Ensure that the thread pool and memory queue can be initialized only once during initialization
    		// Use static inner classes to keep threads safe
    		Singleton.getInstance();
    	}
    	
    	private static class Singleton{
    		
    		private static ThreadPoolAndQueueWrapper instance;
    		
    		static {
    			instance = new ThreadPoolAndQueueWrapper();
    		}
    		
    		public static ThreadPoolAndQueueWrapper getInstance(a) {
    			returninstance; }}}Copy the code
  • A RequestQueue encapsulates the RequestQueue, which holds an internal collection of request queues and provides a method for adding queues

    package com.roncoo.eshop.inventory.request;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ArrayBlockingQueue;
    
    /** * request memory queue encapsulation *@author Administrator
     *
     */
    public class RequestQueue {
    
    	/** * memory queue, which is a collection. Because concurrency is involved, ArrayBlockingQueeue is used, and the queue holds requests (read and write) */
    	private List<ArrayBlockingQueue<Request>> queues = new ArrayList<ArrayBlockingQueue<Request>>();
    	
    	public static RequestQueue getInstance(a) {
    		return Singleton.getInstance();
    	}
    	
    	/** * Add a memory queue *@param queue
    	 */
    	public void addQueue(ArrayBlockingQueue<Request> queue) {
    		this.queues.add(queue);
    	}
    	
    	/** * Internal static class way to ensure absolute thread safety *@author Administrator
    	 *
    	 */
    	private static class Singleton {
    		
    		private static RequestQueue instance;
    		
    		static {
    			instance = new RequestQueue();
    		}
    		
    		public static RequestQueue getInstance(a) {
    			returninstance; }}}Copy the code
  • Worker threads that need to be submitted to the thread pool to process Request requests. And holds its own memory queue

    package com.roncoo.eshop.inventory.thread;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.Callable;
    
    import com.roncoo.eshop.inventory.request.Request;
    
    /** * The worker thread that executes the request *@author Administrator
     *
     */
    public class RequestProcessThread implements Callable<Boolean>{
    
    	/** * self-monitoring memory queue */
    	private ArrayBlockingQueue<Request> queue;
    	
    	public RequestProcessThread(ArrayBlockingQueue<Request> queue) {
    		this.queue = queue;
    	}
    	
    	/** ** Specific workflow */
    	@Override
    	public Boolean call(a) throws Exception {
    		while(true) {
    			break;
    		}
    		return true; }}Copy the code
  • Request encapsulation Request is an interface. Later read and write requests need to implement this interface to carry out their own operation logic

    /** * request interface, read request and write request to implement this interface *@author Administrator
     *
     */
    public interface Request {}Copy the code

The project structure

3.2 Encapsulation of two request objects

  • Create a new entity class ProductInventory

    public class ProductInventory {
    
    	/** * product id */
    	private Integer productId;
    	/** ** inventory */
    	private Long inventoryCnt;
    	
    	public ProductInventory(a) {}public ProductInventory(Integer productId, Long inventoryCnt) {
    		this.productId = productId;
    		this.inventoryCnt = inventoryCnt;
    	}
    	
    	public Integer getProductId(a) {
    		return productId;
    	}
    	public void setProductId(Integer productId) {
    		this.productId = productId;
    	}
    	public Long getInventoryCnt(a) {
    		return inventoryCnt;
    	}
    	public void setInventoryCnt(Long inventoryCnt) {
    		this.inventoryCnt = inventoryCnt; }}Copy the code
  • Add a business method to the Request interface

    public interface Request {
    
    	void process(a);
    }
    Copy the code
  • Inventory ProductInventroyWriteRequest written request

    package com.roncoo.eshop.inventory.request;
    
    import com.roncoo.eshop.inventory.model.ProductInventory;
    import com.roncoo.eshop.inventory.service.IProductInventoryService;
    
    /** * Cache Aside Pattern * 1. Delete cache * 2. Update database *@author Administrator
     *
     */
    public class ProductInventoryWriteRequest implements Request{
    	
    	private ProductInventory productInventory;
    	
    	private IProductInventoryService productInventoryService;
    	
    	public ProductInventoryWriteRequest(ProductInventory productInventory, IProductInventoryService productInventoryService) {
    		this.productInventory = productInventory;
    		this.productInventoryService = productInventoryService;
    	}
    	
    	public void process(a) {
    		1. Delete the cache
    		productInventoryService.removeCache(productInventory);
    		// update the databaseproductInventoryService.updateDb(productInventory); }}Copy the code
  • Inventory ProductInventroyReadRequest read request

    package com.roncoo.eshop.inventory.request;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.roncoo.eshop.inventory.model.ProductInventory;
    import com.roncoo.eshop.inventory.service.IProductInventoryService;
    
    /** * commodity inventory read request * 1. Query database * 2@author Administrator
     *
     */
    public class ProductInventoryReadRequest implements Request{
    
    	/** * product Id */
    	private Integer productId;
    	
    	@Autowired
    	private IProductInventoryService productInventoryService;
    	
    	public ProductInventoryReadRequest(Integer productId, IProductInventoryService productInventoryService) {
    		this.productId = productId;
    		this.productInventoryService = productInventoryService;
    	}
    	
    	@Override
    	public void process(a) {
    		//1. Query the latest inventory from the database
    		ProductInventory productInventory = productInventoryService.findProductInventoryByProductId(productId);
    		//2. Set the inventory to redis cacheproductInventoryService.setProductInventoryToCache(productInventory); }}Copy the code

Project Structure:

3.3 Service encapsulation for Asynchronous execution of requests

The operation in this step is to route incoming requests to the corresponding memory queue according to the item ID. The accepted argument is the request.

Service is not a good name. It is better to call it interface routing proxy

  • The service interface RequestAsyncServiceImpl

    package com.roncoo.eshop.inventory.service.impl;
    
    import java.util.concurrent.ArrayBlockingQueue;
    
    import org.springframework.stereotype.Service;
    
    import com.roncoo.eshop.inventory.request.Request;
    import com.roncoo.eshop.inventory.request.RequestQueue;
    import com.roncoo.eshop.inventory.service.RequestAsyncService;
    
    /** * Asynchronous service * 1. Route requests to different memory queues * 2. Place the request on a memory queue@author Administrator
     *
     */
    @Service("requestAsyncService")
    public class RequestAsyncServiceImpl implements RequestAsyncService{
    
    	@Override
    	public void process(Request request) {
    		try {
    			// Route the request to the memory queue according to the item ID of each request
    			ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId());
    			queue.put(request);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch blocke.printStackTrace(); }}private ArrayBlockingQueue<Request> getRoutingQueue(Integer productId){
    		RequestQueue requestQueue = RequestQueue.getInstance();
    		// Get the hash value of productId
    		String key = String.valueOf(productId);
    		int h;
    		int hash = (key == null)?0 : (h = key.hashCode()) ^ (h >>> 16);
    		// Modulates the hash and routes the hash value to the specified memory queue
    		// The memory queue size is 8
    		// When modulating the hash value with the number of memory queues, the result must be between 0 and 7
    		// Any item ID will be fixed to the same memory queue
    		int index = (requestQueue.queueSize() - 1) & hash;
    		returnrequestQueue.getQueue(index); }}Copy the code

3.4 Code changes to worker threads that handle specific requests

public class RequestProcessThread implements Callable<Boolean>{

	/** * self-monitoring memory queue */
	private ArrayBlockingQueue<Request> queue;
	
	public RequestProcessThread(ArrayBlockingQueue<Request> queue) {
		this.queue = queue;
	}
	
	/** ** Specific workflow */
	@Override
	public Boolean call(a) throws Exception {
		while(true) {
			// Retrieve the request from the memory queue that you monitor
			Request request = queue.take();
			// Perform the operation
			request.process();
			break;
		}
		return true; }}Copy the code

Engineering structure:

3.5 Encapsulation of contorller layer

This is mainly a read request, which should be considered as a continuous loop over 200ms to fetch data from the cache. If not within 200ms, go to the database to query

package com.roncoo.eshop.inventory.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import com.roncoo.eshop.inventory.Response.Response;
import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.request.ProductInventoryWriteRequest;
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.service.IProductInventoryService;
import com.roncoo.eshop.inventory.service.RequestAsyncService;

/** **@author Administrator
 *
 */
@Controller
public class ProductInventoryController {

	@Autowired
	private RequestAsyncService requestAsyncService;
	
	@Autowired
	private IProductInventoryService productInventoryService;

	@RequestMapping("/updateProductInventory")
	@ResponseBody
	public Response updateProductInventory(ProductInventory productInventory) {
		Response response = null;
		
		try {
			Request request = new ProductInventoryWriteRequest(productInventory, productInventoryService);
			requestAsyncService.process(request);
			response = new Response(Response.SUCCESS);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		return response;
	}
	
	@RequestMapping("/getProductInventory")
	@ResponseBody
	public ProductInventory getProductInventory(Integer productId) {
		ProductInventory productInventory = null;
		
		try {
			Request request = new ProductInventoryWriteRequest(productInventory, productInventoryService);
			requestAsyncService.process(request);
			
			// After handing the read request to the service for asynchronous processing, it takes a while
			// Wait for the previous inventory update operation while caching the comfortable operation
			// If the waiting time is more than 200ms, then go to the database
			long startTime = System.currentTimeMillis();
			long endTime = 0L;
			long waitTime = 0L;
			
			while(true) {
				if(waitTime > 200) {
					break;
				}
				
				// Try to fetch data from the cache
				productInventory = productInventoryService.getProductInventoryCache(productId);
				// If there is data, return data
				if(productInventory ! =null) {
					return productInventory;
				}
				
				else {
					Thread.sleep(20); endTime = System.currentTimeMillis(); waitTime = endTime - startTime; }}// If not within the specified time (usually 200ms), then try to fetch the database yourself
			productInventory = productInventoryService.findProductInventoryByProductId(productId);
			if(productInventory ! =null) {
				returnproductInventory; }}catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		return new ProductInventory(productId, -1L); }}Copy the code