preface
We’ve already explained how Redis client Redission implements distributed locking, mostly at the source level.
In distributed system, the common distributed lock implementation scheme and Zookeeper, the next in-depth study of Zookeeper is how to achieve the distributed lock.
They are met
The file system
Zookeeper maintains a data structure similar to a file system
Each subdirectory entry, such as NameService, is called znoed. Like a file system, we can add and delete zNodes freely, and add and delete sub-ZNodes under zNodes. The only difference is that zNodes can store data.
There are four types of ZNodes
-
PERSISTENT– PERSISTENT directory nodes persist even after the client is disconnected from ZooKeeper
-
PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL- PERSISTENT_SEQUENTIAL
-
EPHEMERAL- Temporary Directory node was removed after the client was disconnected from ZooKeeper
-
After the directory node client is disconnected from ZooKeeper, the node is deleted. Zookeeper only numbers the node name sequentially
A notification mechanism
The client registers to listen to the directory nodes it cares about. When the directory node changes (data changes, deletion, addition and deletion of subdirectory nodes), ZooKeeper notifies the client.
A distributed lock
With ZooKeeper’s consistent file system, locking issues are made easier. Locking services can be divided into two categories, one for holding exclusivity and the other for controlling timing.
-
For the first type, we treat a ZNode on ZooKeeper as a lock, implemented by creating a zNode. All clients create the /distribute_lock node, and the client that is successfully created owns the lock. Delete the distribute_lock node you created and release the lock.
-
For the second type, /distribute_lock already exists, and all clients create a temporary sequentially numbered directory node under it. As with master, the least numbered directory node obtains the lock and deletes the zNode it has created when it runs out.
Note: Please refer to www.cnblogs.com/dream-to-pk…
First introduction to the Curator framework
Curator is an open source Zookeeper client framework developed by Netflix. Currently available as a top-level project of Apache, Zookeeper is one of the most popular Zookeeper clients.
Let’s take a look at Apache Curator’s website:
Then look at the quick start on distributed lock Address is: curator.apache.org/getting-sta…
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
try
{
// do some work inside of the critical section here
}
finally{ lock.release(); }}Copy the code
Use the InterProcessMutex class to acquire a distributed lock using the acquire() method.
An example of using a distributed lock for Curator
Start two threads T1 and T2 to compete for the lock. The thread that holds the lock takes 5 seconds. As you can see from multiple runs, sometimes T1 gets the lock first and T2 waits, and sometimes the other way around. The Curator will use the node of our provided lock path as a global lock, and the data on this node will look like this: [_C_64E0811F-9475-44CA-AA36-C1DB65AE5350-lock-00000000001], this string will be generated each time the lock is obtained, and the data will be cleared when the lock is released.
Let’s look at an example of locking:
public class Application {
private static final String ZK_ADDRESS = "192.20.38.58:2181";
private static final String ZK_LOCK_PATH = "/locks/lock_01";
public static void main(String[] args) throws InterruptedException {
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10.5000)); client.start(); System.out.println("zk client start successfully!");
Thread t1 = new Thread(() -> {
doWithLock(client);
}, "t1");
Thread t2 = new Thread(() -> {
doWithLock(client);
}, "t2");
t1.start();
t2.start();
}
private static void doWithLock(CuratorFramework client) {
InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
try {
if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " hold lock");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " release lock"); }}catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch(Exception e) { e.printStackTrace(); }}}}Copy the code
Running results:
Principle of implementation of Curator locking
Let’s look at the code for locking Curator directly:
public class InterProcessMutex implements InterProcessLock.Revocable<InterProcessMutex> {
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
private static class LockData
{
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath; }}@Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if( lockData ! =null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if( lockPath ! =null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false; }}Copy the code
Look directly at the internalLock() method, which first fetches the current thread and then checks to see if the current thread is in a concurrentHashMap. This is the implementation of the reentrant lock. If the current thread already fetches the lock, the number of times the thread fetches the lock is increased by 1
AttemptLock () if no lock is acquired, the attemptLock() method is applied to attemptLock(). If the lockPath is not empty, the lock was acquired successfully and the current thread is placed into the map.
Next look at the core lock-logical attemptLock() method:
Parameter: time: time to wait for obtaining locks Unit: time unit lockNodeBytes: the default value is null
public class LockInternals {
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
finalLong millisToWait = (unit ! =null)? unit.toMillis(time) :null;
final byte[] localLockNodeBytes = (revocable.get() ! =null)?new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
if( localLockNodeBytes ! =null )
{
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
}
else
{
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throwe; }}}if ( hasTheLock )
{
return ourPath;
}
return null; }}Copy the code
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
If the current machine goes down, the temporary node it created will disappear automatically. If the client that acquired the lock goes down, ZK can guarantee that the lock will be released automatically
Create a data structure similar to:
PS: The figure of 01/02 was not captured. Here, the sequence node of 03/04 as shown in the screenshot is displayed in ZK again
Then focus on the logic of interalLockLoop() :
public class LockInternals {
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if( revocable.get() ! =null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while( (client.getState() == CuratorFrameworkState.STARTED) && ! haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() +1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
if( stat ! =null )
{
if( millisToWait ! =null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else{ wait(); }}}// else it may have been deleted (i.e. lock released). Try to acquire again}}}// Omit some code
returnhaveTheLock; }}Copy the code
The logic in the while loop is to acquire the lock:
- To obtain
/locks/lock_01
The zNode nodes in the following order, as already seen in the picture above, will havexxx01
和xxx02
The two nodes - call
getsTheLock()
Method to obtain the lock, wheremaxLeases
Is 1. By default, only one thread can acquire the lock - Locate the
StandardLockInternalsDriver.getsTheLock()
Method, where the code core is as follows:int ourIndex = children.indexOf(sequenceNodeName);
boolean getsTheLock = ourIndex < maxLeases;
- The above
sequenceNodeName
Parameters forxxx01
Is the first element in the children list. If it is the first element, ourIndex=0 is returned, then the lock is considered successful - If is the first element in an ordered list, then
predicateResults.getsTheLock()
True gets the flag bit of the lockhavaTheLock
If the value is true, the system returns success in obtaining the lock
The core code that failed to acquire the lock:
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
if( stat ! =null )
{
if( millisToWait ! =null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else{ wait(); }}}Copy the code
- Add a listener for the last node
- If the lock has an expiration time, break to exit the loop
- The current thread is in wait() state, waiting for the previous thread to release the lock
Principle for implementing the cache release lock
Releasing the lock is very simple. Delete the current temporary node directly, because the next node is listening to the previous node, so when the previous node is deleted, the current node will wake up to acquire the lock again.
private void deleteOurPath(String ourPath) throws Exception
{
try
{
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)}}Copy the code
conclusion
Summary in one chart:
The original image to view my sharing: www.processon.com/view/link/5…
statement
This article first from my blog: www.cnblogs.com/wang-meng and public number: a flower count is not romantic, if reprinted please indicate the source!
Interested partners can pay attention to personal public account: a flower is not romantic