- Distributed lock can be implemented in a variety of ways, such as through the database, Redis can be achieved. As a distributed collaboration tool, ZooKeeper, of course, also has a standard implementation.
- Design ideas
- Each client goes to
/Locks
Create temporary ordered nodes /Locks/Lock_00000x - Client fetch
/Locks
If the lock node is in the first place, it means that the lock is successfully obtained - If its own lock node is not the first, it listens on its previous lock node. For example, if you Lock Lock 000000001, you listen on the previous Lock Lock 000000000
- Listen for the client to re-execute step 2 logic to determine if it has acquired the lock
- Once the lock is acquired, you can perform your own operations. After executing, delete your own lock node.
1. Experiment code
public class DistributedLock {
private String ip = null; // IP address and port
private static Integer timeOut = 5000; // Timeout in milliseconds
private final static CountDownLatch countDownLatch = new CountDownLatch(1);
private final static Logger log = Logger.getLogger(DistributedLock.class);
private ZooKeeper zooKeeper = null;
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
String lockPath; // Lock the path of the node
public DistributedLock(a) {}public DistributedLock(String ip) {
this(ip, timeOut);
}
public DistributedLock(String ip, Integer timeOut) {
this.ip = ip;
DistributedLock.timeOut = timeOut;
try {
initZK(ip, timeOut);
} catch(IOException | InterruptedException e) { e.printStackTrace(); }}private void initZK(String ip, Integer timeOut) throws IOException, InterruptedException {
zooKeeper = new ZooKeeper(ip, timeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
log.info("Connection successful"); countDownLatch.countDown(); }}}); countDownLatch.await(); }public void release(a) {
try {
zooKeeper.delete(lockPath, -1); // Delete this node
zooKeeper.close(); // Close the client
log.info("Released" + this.lockPath);
} catch(InterruptedException | KeeperException e) { e.printStackTrace(); }}/** * need to lock, if not to itself, then block etc */
public void acquire(a) {
try {
createLock(); // Create a lock node
attemptLock(); // Request a node, if not, block
} catch(KeeperException | InterruptedException e) { e.printStackTrace(); }}/** * create your own number */
private void createLock(a) throws KeeperException, InterruptedException {
if (zooKeeper.exists(LOCK_ROOT_PATH, true) = =null) {
zooKeeper.create(LOCK_ROOT_PATH, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
log.info("Create parent node:" + LOCK_ROOT_PATH + "Success");
}
lockPath = zooKeeper.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
log.info("Node created successfully:" + lockPath);
}
/** * A monitor that monitors the deletion of the previous node */
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
synchronized (this) { notifyAll(); }}}};/** * get the lock. If it is not your turn, wait() until the previous node is deleted to wake you */
private void attemptLock(a) throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren(LOCK_ROOT_PATH, null);
Collections.sort(children);
int index = children.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
if (index == 0) {
log.info("Lock obtained successfully");
return;
}
String lastPath = children.get(index - 1); // Get the path of the last node
// Add a monitor that monitors the previous node
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + lastPath, watcher);
if (stat == null) { // If it is empty, it has been deleted
attemptLock();
return;
}
synchronized(watcher) { watcher.wait(); } attemptLock(); }}Copy the code
public class DistributedLockTest {
public static void main(String[] args) {
DistributedLock lock = new DistributedLock("192.168.233.133:2181");
lock.acquire();
try {
Thread.sleep(20000);
} catch(InterruptedException e) { e.printStackTrace(); } lock.release(); }}Copy the code