sequence
This article focuses on LeaderLatch for co-curator recipes
The instance
@Test
public void testCuratorLeaderLatch() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
String leaderLockPath = "/leader-lock2";
List<LeaderLatch> latchList = IntStream.rangeClosed(1,10)
.parallel()
.mapToObj(i -> new LeaderLatch(client,leaderLockPath,"client"+i)) .collect(Collectors.toList()); latchList.parallelStream() .forEach(latch -> { try { latch.start(); } catch (Exception e) { e.printStackTrace(); }}); TimeUnit.SECONDS.sleep(5); Iterator<LeaderLatch> iterator = latchList.iterator();while (iterator.hasNext()){
LeaderLatch latch = iterator.next();
if(latch.hasLeadership()){
System.out.println(latch.getId() + " hasLeadership");
try {
latch.close();
} catch (IOException e) {
e.printStackTrace();
}
iterator.remove();
}
}
TimeUnit.SECONDS.sleep(5);
latchList.stream()
.filter(latch -> latch.hasLeadership())
.forEach(latch -> System.out.println(latch.getId() + " hasLeadership")); Participant participant = latchList.get(0).getLeader(); System.out.println(participant); TimeUnit.MINUTES.sleep(15); latchList.stream() .forEach(latch -> { try { latch.close(); } catch (IOException e) { e.printStackTrace(); }}); client.close(); }Copy the code
- ZkCli query
[zk: localhost:2181(CONNECTED) 17] ls /
[leader-lock1, leader-lock2, zookeeper, leader-lock]
[zk: localhost:2181(CONNECTED) 18] ls /leader-lock2
[_c_4e86edb9-075f-4e18-a00c-cbf4fbf11b23-latch-0000000048, _c_b53efe1b-39ba-48df-8edb-905ddcccf5c9-latch-0000000042, _c_5ea234cc-8350-47ef-beda-8795694b62f6-latch-0000000045, _c_5f3330d9-384c-4abf-8f3e-21623213a374-latch-0000000044, _c_3fdec032-b8a4-44b9-9a9f-20285553a23e-latch-0000000049, _c_97a53125-0ab1-48ea-85cc-cdba631ce20f-latch-0000000047, _c_2bb56be2-ba17-485e-bbd3-10aa1d6af57c-latch-0000000043, _c_93fb732d-541b-48c6-aca7-dd2cd9b6f93e-latch-0000000041, _c_e09f0307-344c-4041-ab71-d68e10a48d02-latch-0000000046, _c_754a4f90-b03c-4803-915b-0654ad35ec9f-latch-0000000040]
Copy the code
LeaderLatch.start
Curator – recipes – 4.0.1 – sources jar! /org/apache/curator/framework/recipes/leader/LeaderLatch.java
/**
* Add this instance to the leadership election and attempt to acquire leadership.
*
* @throws Exception errors
*/
public void start() throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run() { try { internalStart(); } finally { startTask.set(null); }}})); } private synchronized voidinternalStart()
{
if ( state.get() == State.STARTED )
{
client.getConnectionStateListenable().addListener(listener);
try
{
reset();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("An error occurred checking resetting leadership.", e);
}
}
}
@VisibleForTesting
void reset() throws Exception
{
setLeadership(false);
setNode(null);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if( debugResetWaitLatch ! = null ) { debugResetWaitLatch.await(); debugResetWaitLatch = null; }if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
setNode(event.getName());
if ( state.get() == State.CLOSED )
{
setNode(null);
}
else{ getChildren(); }}else
{
log.error("getChildren() failed. rc = "+ event.getResultCode()); }}}; client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackgrou nd(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); }Copy the code
- Here, the start method indicates participation in the election, and the reset method creates child nodes through forPath
- Where zkpaths.makePath (latchPath, LOCK_NAME) returns /latchPath/latch-
- There is a callback that does the getChildren processing
CreateBuilderImpl.forPath
Curator – framework – 4.0.1 – sources jar! /org/apache/curator/framework/imps/CreateBuilderImpl.java
@VisibleForTesting
static final String PROTECTED_PREFIX = "_c_";
@Override
public String forPath(final String givenPath, byte[] data) throws Exception
{
if ( compress )
{
data = client.getCompressionProvider().compress(givenPath, data);
}
final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
List<ACL> aclList = acling.getAclList(adjustedPath);
client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);
String returnPath = null;
if ( backgrounding.inBackground() )
{
pathInBackground(adjustedPath, data, givenPath);
}
else
{
String path = protectedPathInForeground(adjustedPath, data, aclList);
returnPath = client.unfixForNamespace(path);
}
return returnPath;
}
@VisibleForTesting
String adjustPath(String path) throws Exception
{
if ( doProtected )
{
ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
String name = getProtectedPrefix(protectedId) + pathAndNode.getNode();
path = ZKPaths.makePath(pathAndNode.getPath(), name);
}
return path;
}
private static String getProtectedPrefix(String protectedId)
{
return PROTECTED_PREFIX + protectedId + "-";
}
Copy the code
- If a namespace was not specified when CuratorFramework was created, client.fixForNamespace returns the original value
- AdjustPath (adjustPath) for doProtected things like adjustPath (adjustPath)
UUID
For example, the original latch- is changed to _C_a749FD26-B739-4510-9E1B-D2974F6DD1D1-latch – - Then, since it is EPHEMERAL_SEQUENTIAL, the serial numbers are finally added, such as /leader-lock2/ _C_a749FD26-b739-4510-9e1B-D2974F6DD1D1-LATch-0000000045, The value of the node is the ID specified by the LeaderLatch
LeaderLatch.getChildren
Curator – recipes – 4.0.1 – sources jar! /org/apache/curator/framework/recipes/leader/LeaderLatch.java
private void getChildren() throws Exception
{
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if( event.getResultCode() == KeeperException.Code.OK.intValue() ) { checkLeadership(event.getChildren()); }}}; client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); } private void checkLeadership(List<String> children) throws Exception { final StringlocalOurPath = ourPath.get();
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath ! = null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
if ( ourIndex < 0 )
{
log.error("Can't find our node. Resetting. Index: " + ourIndex);
reset();
}
else if ( ourIndex == 0 )
{
setLeadership(true);
}
else
{
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath ! = null) ) { try { getChildren(); } catch ( Exception ex ) { ThreadUtils.checkInterrupted(ex); log.error("An error occurred checking the leadership.", ex); }}}}; BackgroundCallback callback = newBackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) { // previous node is gone - reset reset(); }}}; // use getData() instead of exists() to avoid leaving unneeded watcherswhich is a typeof resource leak client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); }}Copy the code
- The checkLeadership method is called here, which marks leader for index 0 and adds Watch for index greater than 0. The path of watch is the previous node. If the previous node is deleted, the getChildren method will be triggered again
- A callback is also registered, which retriggers the reset operation if the previous node is deleted
LeaderLatch.close
Curator – recipes – 4.0.1 – sources jar! /org/apache/curator/framework/recipes/leader/LeaderLatch.java
/**
* Remove this instance from the leadership election. If this instance is the leader, leadership
* is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
* instances must eventually be closed.
*
* @throws IOException errors
*/
@Override
public void close() throws IOException
{
close(closeMode);
}
/**
* Remove this instance from the leadership election. If this instance is the leader, leadership
* is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
* instances must eventually be closed.
*
* @param closeMode allows the default close mode to be overridden at the time the latch is closed.
* @throws IOException errors
*/
public synchronized void close(CloseMode closeMode) throws IOException
{
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
cancelStartTask();
try
{
setNode(null);
client.removeWatchers();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
throw new IOException(e);
}
finally
{
client.getConnectionStateListenable().removeListener(listener);
switch ( closeMode )
{
case NOTIFY_LEADER:
{
setLeadership(false);
listeners.clear();
break;
}
default:
{
listeners.clear();
setLeadership(false);
break;
}
}
}
}
private synchronized void setLeadership(boolean newValue)
{
boolean oldValue = hasLeadership.getAndSet(newValue);
if( oldValue && ! newValue ) { // Lost leadership, wastrue, now false
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener listener)
{
listener.notLeader();
returnnull; }}); }else if ( !oldValue && newValue )
{ // Gained leadership, was false, now true
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener input)
{
input.isLeader();
returnnull; }}); } notifyAll(); }Copy the code
- The close method is used to remove the LeaderLatch from the election, or if the Latch is the Leader, the Leadership needs to be released
- The close method cancels StartTask, sets the node value to NULL, removes watcher and ConnectionStateListener, sets Leadership to false, and triggers the associated listener
- Note that if closeMode is NOTIFY_LEADER, set leadership to false and then remove the listener. Otherwise, remove the listener and set it to false
- SetLeadership calls listener.notleader () or input.isleader () based on the old and new values
ConnectionStateListener
Curator – recipes – 4.0.1 – sources jar! /org/apache/curator/framework/recipes/leader/LeaderLatch.java
private final ConnectionStateListener listener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { handleStateChange(newState); }}; private void handleStateChange(ConnectionState newState) { switch ( newState ) { default: { // NOPbreak;
}
case RECONNECTED:
{
try
{
if( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || ! hasLeadership.get() ) { reset(); } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Could not reset leader latch", e);
setLeadership(false);
}
break;
}
case SUSPENDED:
{
if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) )
{
setLeadership(false);
}
break;
}
case LOST:
{
setLeadership(false);
break; }}}Copy the code
- LeaderLatch has registered a custom ConnectionStateListener for RECONNECTED, SUSPENDED, and LOST events
- When setLeadership(false) is used, the listener is notified of the new value and the new value. If the leader is used, the listener.notleader () callback is used.
- For the RECONNECTED state, if the Latch is not currently the leader, the reset process is invoked to re-register the node
summary
- LeaderLatch of Curator Recipes gives us a convenient method for leader election and provides a LeaderLatchListener for custom processing
- The LeaderLatch uses the ‘EPHEMERAL_SEQUENTIAL’ of ZK. The node name is automatically numbered. The default LOCK_NAME is latch-, and for protected, PROTECTED_PREFIX(
_c_
) and protectedId (UUID
), the final node name is in the format of PROTECTED_PREFIX+UUID+LOCK_NAME+ number, similar to _C_a749FD26-b739-4510-9e1B-D2974F6DD1D1-latch-0000000045 - The LeaderLatch uses ConnectionStateListener to process the change of its own node. The node whose index is 0 is the leader. For the node that is not the leader, watcher is added to the previous node to process the deletion of the previous node. Trigger the checkLeadership operation and re-check whether its index is in the first place among children. If so, update it to leader and trigger the corresponding operation. If not, re-watch the node in front of it. The implementation of such a link is very subtle.
doc
- Leader Latch
- Simple example of Apache Curator Leader election
- A detailed explanation of two distributed Leader election strategies based on Apache Curator framework