Zookeeper client selection

  • The native ZooKeeper client has a series of problems such as watcher one-time, no timeout reconnection mechanism
  • ZkClient, to solve some problems of the native client, some stock in the old system is still in use
  • Curator, which provides a variety of application scenarios (encapsulating distributed locks, counters, etc.), is preferred for new projects

Distributed lock usage scenario

In a single project, the lock in the JVM can complete the need, but in a microservice or distributed environment, the same service may be deployed on multiple servers, and the synchronization between multiple JVMS cannot be completed through the common JVM lock, so the distributed lock needs to be borrowed to complete the lock and release the lock. For example, in the order service, we need to generate the order number stream based on the date, and it is possible to produce the same time and date, resulting in duplicate order numbers.

Zookeeper Distributed lock implementation principles

  • Zookeeper provides that no more than one client can create the same node at the same time. This feature can be used to implement distributed locking. Temporary ZooKeeper nodes exist only during the session life cycle and are automatically destroyed after the session ends.
  • Watcher mechanism, when the node representing the lock resource is deleted, it can trigger Watcher to unblock and acquire the lock again. This is also a major advantage of ZooKeeper distributed lock compared with other distributed lock schemes.

Based on the temporary node scheme

Logic is the first kind of plan implementation is relatively simple, who created the node success, who will hold locks, create failed to block, A thread that holds A lock, thread B for failure will be blocked, at the same time/lockPath set up to monitor, A thread to execute after the operation to delete the node, trigger listener, B thread unblocked at this time, again to get the lock.

Temporary node

We imitate the lock interface design of the native JDK, using template method design mode to write distributed lock, this advantage is strong scalability, we can quickly switch to redis distributed lock, database distributed lock and other implementation methods.

Public interface Lock {/** * obtains the Lock */ void getLock() throws Exception; /** * release the lock */ void unlock() throws Exception; }</pre> AbstractTemplateLock implements Lock public abstract class AbstractTemplateLock implements LockgetLock() {
        if (tryLock()) {
            System.out.println(Thread.currentThread().getName() + "Lock obtained successfully");
        } else{/ / waitingwaitLock(); // Get getLock() again; } } protected abstract voidwaitLock();
    protected abstract boolean tryLock();
    protected abstract void releaseLock();
    @Override
    public void unlock() { releaseLock(); }} ZooKeeper distributed lock logic <pre language="typescript" code_block="true">@Slf4j
public class ZkTemplateLock extends AbstractTemplateLock {
    private static final String zkServers = "127.0.0.1:2181";
    private static final int sessionTimeout = 8000;
    private static final int connectionTimeout = 5000;

    private static final String lockPath = "/lockPath";

    private ZkClient client;

    public ZkTemplateLock() {
        client = new ZkClient(zkServers, sessionTimeout, connectionTimeout);
        log.info(Zk client connection successful :{}",zkServers);
    }

    @Override
    protected void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);

        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("Listen for deletion of node"); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception {} }; / / complete watcher registered client. SubscribeDataChanges (lockPath, listener); // block yourselfif(client.exists(lockPath)) { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); }} / / cancel the watcher registered client. UnsubscribeDataChanges (lockPath, listener); } @Override protected booleantryLock() {
        try {
            client.createEphemeral(lockPath);
            System.out.println(Thread.currentThread().getName()+"Lock obtained");
        } catch (Exception e) {
            log.error("Create failed");
            return false;
        }
        return true;
    }

    @Override
    public void releaseLock() { client.delete(this.lockPath); }}Copy the code

Disadvantages:

Every time to compete for the lock, only one thread will get the lock. When the number of threads is large, “stampede” phenomenon will occur, and the ZooKeeper node may run slowly or even break down. This is because other threads listen to the /lockPath node when they do not acquire the lock. When thread A is released, A large number of threads stop blocking at the same time, and fight for the lock. This operation is very expensive and suffers from poor performance.

Based on temporary sequential node scheme

Temporary order node and the difference is the temporary node is ordered, we can use this feature, only to the current thread to monitor on a number of threads, each time when acquiring the lock whether your serial number to a minimum, the minimum is obtained to lock, and continue to delete the current node is performed to judge who is the minimum number of nodes.

Serialized node

Temporary sequence node

Temporary sequential node operation source code

public class ZkSequenTemplateLock extends AbstractTemplateLock {
    private static final String zkServers = "127.0.0.1:2181";
    private static final int sessionTimeout = 8000;
    private static final int connectionTimeout = 5000;
    private static final String lockPath = "/lockPath";
    private String beforePath;
    private String currentPath;
    private ZkClient client;

    public ZkSequenTemplateLock() {
        client = new ZkClient(zkServers);
        if(! client.exists(lockPath)) { client.createPersistent(lockPath); } log.info(Zk client connection successful :{}",zkServers);

    }

    @Override
    protected void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("Listen for deletion of node"); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception {} }; / / add data to row in front of the node delete watcher, essence is to start another thread to listen on a node client. SubscribeDataChanges (beforePath, listener); // block yourselfif (client.exists(beforePath)) {
            try {
                System.out.println("Block"+currentPath); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); }} / / cancel the watcher registered client. UnsubscribeDataChanges (beforePath, listener); } @Override protected booleantryLock() {
        if(the currentPath = = null) {/ / currentPath = client creates a temporary order node. CreateEphemeralSequential (lockPath +"/"."lock-data");
            System.out.println("current:"+ currentPath); } // Get all the children and sort them. List<String> childrens = client.getChildren(lockPath); // Sort list, sort collections.sort (childrens);if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
            return true;
        } else{// If the current node is not the first one, get the previous node information, Assign the value to beforePath int curIndex = articles.indexof (currentPath.subString (lockpath.length () + 1)); beforePath = lockPath +"/" + childrens.get(curIndex - 1);
        }
        System.out.println("beforePath"+beforePath);
        return false;
    }

    @Override
    public void releaseLock() {
        System.out.println("delete:" + currentPath);
        client.delete(currentPath);
    }
}</pre>

## toth distributed lock toolCo-curator provides the following types of locks: * Shared Reentrant Lock: globally synchronized Lock where no two clients hold a Lock at the same time * Shared Lock: Similar to Shared reentrant locks, but not reentrant (sometimes deadlocks occur for this reason) * Shared reentrant read-write locks * Shared semaphores * Multi Shared locks: We use InterProcessMutex of the first **Shared Reentrant Lock** to Lock and release the Lock <pre language="typescript" code_block="true"> public class ZkLockWithCuratorTemplate implements Lock {/ / zk host address private String host ="localhost"; // ZK Automatic Storage node private String lockPath ="/curatorLock"; Private static final int SLEEP_TIME_MS = 1000; Private static final int MAX_RETRIES = 1000; Private static final int SESSION_TIMEOUT = 30 * 1000; Private static final int CONNECTION_TIMEOUT = 3 * 1000; private static final int CONNECTION_TIMEOUT = 3 * 1000; // Private CuratorFramework CuratorFramework; InterProcessMutex lock; publicZkLockWithCuratorTemplate() { curatorFramework = CuratorFrameworkFactory.builder() .connectString(host) .connectionTimeoutMs(CONNECTION_TIMEOUT) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES)) .build(); curatorFramework.start(); lock = new InterProcessMutex (curatorFramework, lockPath); } @override public void getLock() throws Exception {// Release lock.acquire(5, timeunit.seconds); } @Override public void unlock() throws Exception { lock.release(); }}Copy the code

Source code and test class address

Github.com/Motianshi/d…