In distributed systems, synchronized or Lock in Java is no longer sufficient. On the realization of distributed lock, we can use the unique index of MySQL to achieve, can also use Redis SETNX, also can use Zookeeper node unique path to achieve.

(1) First go to the /locks path and create a temporary node with a sequence number.

(2) Determine if the node is the smallest in the /locks path, and if so, acquire the lock. If not, it listens on its previous node.

(3) After obtaining the lock, process its own business logic, and then delete the node created by itself. When the next node listening on it receives the notification, perform step (2)

The process above is similar to the synchronous queue of AQS: determine whether you are the head of the queue, if so, to obtain the lock, if not, wait.

1, native Zookeeper code to achieve distributed lock

Along these lines, we can quickly implement distributed locks using zooKeeper-related apis.

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributedLock {
     // List of ZooKeeper Servers
     private String connectString =                   
                                "192.168.1.128:2181192168 1.129:2181192168 1.130:2181";
     // The timeout period
     private int sessionTimeout = 2000;
     private ZooKeeper zk;
     private String rootNode = "locks";
     private String subNode = "seq-";
     // The child node that the client is waiting for
     private String waitPath;
     // The ZooKeeper connection waits
     private CountDownLatch connectLatch = new CountDownLatch(1);
     // The ZooKeeper node waits
     private CountDownLatch waitLatch = new CountDownLatch(1);
     // The child node created by the current client
     private String currentNode;
    
     // Establish a connection with the ZK service and create a root node
     public DistributedLock(a) throws IOException, InterruptedException, KeeperException {
 	 	zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
 			@Override
 			public void process(WatchedEvent event) {
 				// When the latch is established, open the latch and wake up the threads waiting on the LATCH
 				if (event.getState() == Event.KeeperState.SyncConnected) {
 					connectLatch.countDown();
 				}
 				// A waitPath deletion event occurred
 				if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { waitLatch.countDown(); }}});// Wait for the connection to be established
 		connectLatch.await();
 		// Get the root node status
 		Stat stat = zk.exists("/" + rootNode, false);
		// If the root node does not exist, the root node is created. The root node type is permanent
 		if (stat == null) {
 			System.out.println("Root node does not exist");
 			zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }}// Lock method
 	public void zkLock(a) {
 		try {
 			// Create a temporary sequential node under the root node, return the path of the created node
 			currentNode = zk.create("/" + rootNode + "/" + subNode, null, 	                                            ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
			// Wait a moment to make the result clearer
			Thread.sleep(10);
 			// Note that there is no need to listen for changes in "/locks" child nodes
                        List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
 			// There is only one child in the list, which must be currentNode
 			if (childrenNodes.size() == 1) {
 				return;
			} else {
 				// Sort all temporary order nodes under the root node from smallest to largest
 				Collections.sort(childrenNodes);
 				// The current node name
 				String thisNode = currentNode.substring(("/" + rootNode + "/").length());
 				// Get the location of the current node
 				int index = childrenNodes.indexOf(thisNode);
 				if (index == -1) {
 					System.out.println("Abnormal data");
 				} else if (index == 0) {
 					// index == 0, thisNode is the smallest in the list, the current client has the lock
 					return;
 				} else {
 					// Get the node ranked 1 bit higher than currentNode
 					this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
			 		// Register a listener on waitPath. When waitPath is deleted, ZooKeeper calls back the listener's process method
					zk.getData(waitPath, true.new Stat());
 					// Enter the waiting state
 					waitLatch.await();
 					return; }}}catch (KeeperException e) {
	 		e.printStackTrace();
 		} catch(InterruptedException e) { e.printStackTrace(); }}// Unlock method
 	public void zkUnlock(a) {
 		try {
 			zk.delete(this.currentNode, -1);
 		} catch(InterruptedException | KeeperException e) { e.printStackTrace(); }}}Copy the code

By creating a temporary node with serial number in ZooKeeper, and then judging whether the serial number of the temporary node created by the current thread is the smallest, if so, the lock will be obtained, otherwise the previous node will be monitored.

The reason for creating a temporary node is that the ZooKeeper server will hang up after creation. If the permanent node is created, it will be deadlocked. Temporary nodes are deleted when the server is shut down.

We use CountDownLatch to await the node while listening on it. When a node changes, the process method is called, countDown in the process method.

test

import org.apache.zookeeper.KeeperException;
import java.io.IOException;

public class DistributedLockTest {
 	public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
 		// Create distributed lock 1
 		final DistributedLock lock1 = new DistributedLock();
 		// Create distributed lock 2
 		final DistributedLock lock2 = new DistributedLock();
        
 		new Thread(new Runnable() {
 			@Override
 			public void run(a) {
 				// Get the lock object
 				try {
 					lock1.zkLock();
 					System.out.println("Thread 1 acquires lock");
 					Thread.sleep(5 * 1000);
 					lock1.zkUnlock();
 					System.out.println("Thread 1 releases lock");
 				} catch (Exception e) {
 					e.printStackTrace();
 				}
 			}
 		}).start();
 
                new Thread(new Runnable() {
 			@Override
		 	public void run(a) {
 				// Get the lock object
 				try {
 					lock2.zkLock();
 					System.out.println("Thread 2 acquires lock");
 					Thread.sleep(5 * 1000);
 					lock2.zkUnlock();
 					System.out.println("Thread 2 releases lock");
 				} catch(Exception e) { e.printStackTrace(); } } }).start(); }}Copy the code

Create two threads to test and see the control printout

thread1Get lock thread1Release lock thread2Get lock thread2Release the lockCopy the code

2. Use the Curator framework to implement distributed locking

The official documentation

Problems with using native apis

  • Session connections are asynchronous and need to be handled by yourself. Like using CountDownLatch
  • Watch needs to be registered repeatedly, otherwise it will not take effect
  • The complexity of development is high
  • Multiple nodes cannot be deleted or created. You have to recurse yourself

Based on the above, generally practical development is achieved by using the Curator. After all, someone else’s wheel is large and secure, so why build a crappy wheel yourself?

There are four main types of locks that Curator implements

  • InterProcessMutex: Distributed reentrable exclusive lock
  • InterProcessSemaphoreMutex: distributed exclusive lock
  • InterProcessReadWriteLock: distributed read-write lock
  • InterProcessMultiLock: Containers that manage multiple locks as a single entity

First you need to add dependencies to your project

<dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-framework</artifactId>
     <version>4.3.0</version>
</dependency>
<dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
     <version>4.3.0</version>
</dependency>
<dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-client</artifactId>
     <version>4.3.0</version>
</dependency
Copy the code

And then implement it

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {
 	private String rootNode = "/locks";
        // List of ZooKeeper Servers
	private String connectString = 
                 "192.168.1.128:2181192168 1.129:2181192168 1.130:2181";
 	// Connection Timeout duration
 	private int connectionTimeout = 2000;
 	// Session timeout duration
 	private int sessionTimeout = 2000;
 
        public static void main(String[] args) {
 		new CuratorLockTest().test();
 	}
    
 	/ / test
 	private void test(a) {
 		// Create distributed lock 1
 		final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
 		// Create distributed lock 2
 		final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
 
                new Thread(new Runnable() {
 			@Override
 			public void run(a) {
 				// Get the lock object
 				try {
 					lock1.acquire();
 					System.out.println("Thread 1 acquires lock");
 					// Test lock reentrant
 					lock1.acquire();
 					System.out.println("Thread 1 acquires the lock again");
 					Thread.sleep(5 * 1000);
 					lock1.release();
 					System.out.println("Thread 1 releases lock");
 					lock1.release();
 					System.out.println("Thread 1 releases the lock again");
 				} catch (Exception e) {
 					e.printStackTrace();
 				}
 			}
 		}).start();
 
                new Thread(new Runnable() {
 			@Override
 			public void run(a) {
 				// Get the lock object
		 		try {
 					lock2.acquire();
 					System.out.println("Thread 2 acquires lock");
 					// Test lock reentrant
 					lock2.acquire();
     				System.out.println("Thread 2 acquires the lock again");
 					Thread.sleep(5 * 1000);
 					lock2.release();
 					System.out.println("Thread 2 releases lock");
 					lock2.release();
 					System.out.println("Thread 2 releases the lock again");
 				} catch (Exception e) {
 					e.printStackTrace();
 				}
 			}
 		}).start();
 	}
 
        // Initialize the distributed lock
 	public CuratorFramework getCuratorFramework (a){
 		// Retry policy. Initial retry time is 3 seconds and retry 3 times
 		RetryPolicy policy = new ExponentialBackoffRetry(3000.3);
 		// Create a Curator from the factory
 		CuratorFramework client = CuratorFrameworkFactory.builder()
 					        .connectString(connectString)
 					        .connectionTimeoutMs(connectionTimeout)
 						.sessionTimeoutMs(sessionTimeout)
 						.retryPolicy(policy).build();
 		// Open the connection
 		client.start();
 		System.out.println("Zookeeper initialization completed...");
 		returnclient; }}Copy the code

View console output

thread1Get lock thread1Get the lock thread again1Release lock thread1Release the lock thread again2Get lock thread2Get the lock thread again2Release lock thread2Release the lock againCopy the code