preface

We’ve already explained how Redis client Redission implements distributed locking, mostly at the source level.

In distributed system, the common distributed lock implementation scheme and Zookeeper, the next in-depth study of Zookeeper is how to achieve the distributed lock.

They are met

The file system

Zookeeper maintains a data structure similar to a file system

Each subdirectory entry, such as NameService, is called znoed. Like a file system, we can add and delete zNodes freely, and add and delete sub-ZNodes under zNodes. The only difference is that zNodes can store data.

There are four types of ZNodes

  • PERSISTENT– PERSISTENT directory nodes persist even after the client is disconnected from ZooKeeper

  • PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL

  • EPHEMERAL- Temporary Directory node was removed after the client was disconnected from ZooKeeper

  • After the directory node client is disconnected from ZooKeeper, the node is deleted. Zookeeper only numbers the node name sequentially

A notification mechanism

The client registers to listen to the directory nodes it cares about. When the directory node changes (data changes, deletion, addition and deletion of subdirectory nodes), ZooKeeper notifies the client.

A distributed lock

With ZooKeeper’s consistent file system, locking issues are made easier. Locking services can be divided into two categories, one for holding exclusivity and the other for controlling timing.

  1. For the first type, we treat a ZNode on ZooKeeper as a lock, implemented by creating a zNode. All clients create the /distribute_lock node, and the client that is successfully created owns the lock. Delete the distribute_lock node you created and release the lock.

  2. For the second type, /distribute_lock already exists, and all clients create a temporary sequentially numbered directory node under it. As with master, the least numbered directory node obtains the lock and deletes the zNode it has created when it runs out.

Note: Please refer to www.cnblogs.com/dream-to-pk…

First introduction to the Curator framework

Curator is an open source Zookeeper client framework developed by Netflix. Currently available as a top-level project of Apache, Zookeeper is one of the most popular Zookeeper clients.

Let’s take a look at Apache Curator’s website:

Then look at the quick start on distributed lock Address is: curator.apache.org/getting-sta…

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) ) 
{
    try 
    {
        // do some work inside of the critical section here
    }
    finally{ lock.release(); }}Copy the code

Use the InterProcessMutex class to acquire a distributed lock using the acquire() method.

An example of using a distributed lock for Curator

Start two threads T1 and T2 to compete for the lock. The thread that holds the lock takes 5 seconds. As you can see from multiple runs, sometimes T1 gets the lock first and T2 waits, and sometimes the other way around. The Curator will use the node of our provided lock path as a global lock, and the data on this node will look like this: [_C_64E0811F-9475-44CA-AA36-C1DB65AE5350-lock-00000000001], this string will be generated each time the lock is obtained, and the data will be cleared when the lock is released.

Let’s look at an example of locking:

public class Application {
    private static final String ZK_ADDRESS = "192.20.38.58:2181";
    private static final String ZK_LOCK_PATH = "/locks/lock_01";

    public static void main(String[] args) throws InterruptedException {
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10.5000)); client.start(); System.out.println("zk client start successfully!");

        Thread t1 = new Thread(() -> {
            doWithLock(client);
        }, "t1");
        Thread t2 = new Thread(() -> {
            doWithLock(client);
        }, "t2");

        t1.start();
        t2.start();
    }

    private static void doWithLock(CuratorFramework client) {
        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
        try {
            if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + " hold lock");
                Thread.sleep(5000L);
                System.out.println(Thread.currentThread().getName() + " release lock"); }}catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch(Exception e) { e.printStackTrace(); }}}}Copy the code

Running results:

Principle of implementation of Curator locking

Let’s look at the code for locking Curator directly:

public class InterProcessMutex implements InterProcessLock.Revocable<InterProcessMutex> {
	
	private final ConcurrentMap<Thread, LockData>   threadData = Maps.newConcurrentMap();

	 private static class LockData
    {
        final Thread        owningThread;
        final String        lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
        {
            this.owningThread = owningThread;
            this.lockPath = lockPath; }}@Override
    public boolean acquire(long time, TimeUnit unit) throws Exception
    {
        return internalLock(time, unit);
    }


 	private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */

        Thread          currentThread = Thread.currentThread();

        LockData        lockData = threadData.get(currentThread);
        if( lockData ! =null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if( lockPath ! =null )
        {
            LockData        newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false; }}Copy the code

Look directly at the internalLock() method, which first fetches the current thread and then checks to see if the current thread is in a concurrentHashMap. This is the implementation of the reentrant lock. If the current thread already fetches the lock, the number of times the thread fetches the lock is increased by 1

AttemptLock () if no lock is acquired, the attemptLock() method is applied to attemptLock(). If the lockPath is not empty, the lock was acquired successfully and the current thread is placed into the map.

Next look at the core lock-logical attemptLock() method:

Parameter: time: time to wait for obtaining locks Unit: time unit lockNodeBytes: the default value is null

public class LockInternals {	
	String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        finalLong millisToWait = (unit ! =null)? unit.toMillis(time) :null;
        final byte[] localLockNodeBytes = (revocable.get() ! =null)?new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
                if( localLockNodeBytes ! =null )
                {
                    ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
                }
                else
                {
                    ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
                }
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throwe; }}}if ( hasTheLock )
        {
            return ourPath;
        }

        return null; }}Copy the code

ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);

If the current machine goes down, the temporary node it created will disappear automatically. If the client that acquired the lock goes down, ZK can guarantee that the lock will be released automatically

Create a data structure similar to:

PS: The figure of 01/02 was not captured. Here, the sequence node of 03/04 as shown in the screenshot is displayed in ZK again

Then focus on the logic of interalLockLoop() :

public class LockInternals {
	private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if( revocable.get() ! =null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while( (client.getState() == CuratorFrameworkState.STARTED) && ! haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() +1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
                        if( stat ! =null )
                        {
                            if( millisToWait ! =null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else{ wait(); }}}// else it may have been deleted (i.e. lock released). Try to acquire again}}}// Omit some code
        returnhaveTheLock; }}Copy the code

The logic in the while loop is to acquire the lock:

  1. To obtain/locks/lock_01The zNode nodes in the following order, as already seen in the picture above, will havexxx01xxx02The two nodes
  2. callgetsTheLock()Method to obtain the lock, wheremaxLeasesIs 1. By default, only one thread can acquire the lock
  3. Locate theStandardLockInternalsDriver.getsTheLock()Method, where the code core is as follows:int ourIndex = children.indexOf(sequenceNodeName); boolean getsTheLock = ourIndex < maxLeases;
  4. The abovesequenceNodeNameParameters forxxx01Is the first element in the children list. If it is the first element, ourIndex=0 is returned, then the lock is considered successful
  5. If is the first element in an ordered list, thenpredicateResults.getsTheLock()True gets the flag bit of the lockhavaTheLockIf the value is true, the system returns success in obtaining the lock

The core code that failed to acquire the lock:

String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

synchronized(this)
{
    Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
    if( stat ! =null )
    {
        if( millisToWait ! =null )
        {
            millisToWait -= (System.currentTimeMillis() - startMillis);
            startMillis = System.currentTimeMillis();
            if ( millisToWait <= 0 )
            {
                doDelete = true;    // timed out - delete our node
                break;
            }

            wait(millisToWait);
        }
        else{ wait(); }}}Copy the code
  1. Add a listener for the last node
  2. If the lock has an expiration time, break to exit the loop
  3. The current thread is in wait() state, waiting for the previous thread to release the lock

Principle for implementing the cache release lock

Releasing the lock is very simple. Delete the current temporary node directly, because the next node is listening to the previous node, so when the previous node is deleted, the current node will wake up to acquire the lock again.

private void deleteOurPath(String ourPath) throws Exception
{
    try
    {
        client.delete().guaranteed().forPath(ourPath);
    }
    catch ( KeeperException.NoNodeException e )
    {
        // ignore - already deleted (possibly expired session, etc.)}}Copy the code

conclusion

Summary in one chart:

The original image to view my sharing: www.processon.com/view/link/5…

statement

This article first from my blog: www.cnblogs.com/wang-meng and public number: a flower count is not romantic, if reprinted please indicate the source!

Interested partners can pay attention to personal public account: a flower is not romantic