More excellent articles.

“Microservices are not all, just a subset of a specific domain”

Selection and process should be careful, otherwise it will be out of control.

Out of all the monitoring components, there’s always one for you

The Most Common Set of “Vim” Techniques for Linux Production

What are we Developing with Netty?

Most useful series:

The Most Common Set of “Vim” Techniques for Linux Production

The most Common set of “Sed” Techniques for Linux Production

The Most Common Set of “AWK” Tips for Linux Production

Welcome Linux and Java backend students to pay attention to the public number.

Note: this document is mainly based on the official documentation, specific to view: curator.apache.org/index.html

Curator is Netflix’s open source ZooKeeper client, currently Apache’s top project. Compared with the native client provided by Zookeeper, Curator has a higher level of abstraction, which simplifies the development of Zookeeper client. Curator addresses many of the very low-level details of zooKeeper client development, including connection reconnection, repeated wathcer registrations, and NodeExistsException exceptions.

Co-curator consists of a series of modules, common to the average developer are co-curator framework and co-curator recipes, which are described below.

1. Maven dependency issues

The latest version of Curator 4.0 supports ZooKeeper 3.4.x and 3.5, but it is necessary to pay attention to the dependency passed in by curator 4.0, which must be consistent with the version used on the actual server. Take ZooKeeper 3.4.6 as an example

< the dependency > < groupId > org. Apache. Curator < / groupId > < artifactId > curator - framework < / artifactId > < version > 4.0.0 < / version > <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> < version > 4.0.0 < / version > < exclusions > < exclusion > < groupId > org. Apache. Zookeeper < / groupId > <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> < the groupId > org. Apache. Zookeeper < / groupId > < artifactId > zookeeper < / artifactId > < version > 3.4.6 < / version > < / dependency >Copy the code

2.curator-framework

public static CuratorFramework getClient() {
    return CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181").retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutms (15 * 1000) Default 15 seconds. SessionTimeoutMs (60 * 1000) // Session timeout duration, default 60 seconds. Namespace ("arch"// Set the namespace.build(); } public static void create(final CuratorFramework client, final String path, final byte[] payload) throws Exception { client.create().creatingParentsIfNeeded().forPath(path, payload); } public static void createEphemeral(final CuratorFramework client, final String path, final byte[] payload) throws Exception { client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); } public static String createEphemeralSequential(final CuratorFramework client, final String path, final byte[] payload) throws Exception {return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);
}
 
public static void setData(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
    client.setData().forPath(path, payload);
}
 
public static void delete(final CuratorFramework client, final String path) throws Exception {
    client.delete().deletingChildrenIfNeeded().forPath(path);
}
 
public static void guaranteedDelete(final CuratorFramework client, final String path) throws Exception {
    client.delete().guaranteed().forPath(path);
}
 
public static String getData(final CuratorFramework client, final String path) throws Exception {
    return new String(client.getData().forPath(path));
}
 
public static List<String> getChildren(final CuratorFramework client, final String path) throws Exception {
    return client.getChildren().forPath(path);
}
Copy the code

3.curator-recipes

Toco-recipes provide a few references to typical zK usage scenarios, focusing on common components used in development.

Event listeners

Zookeeper native supports event monitoring by registering Watcher, but it is not particularly convenient to use, requiring developers to repeatedly register Watcher, which is tedious. A Cache is introduced to monitor zooKeeper server transactions. Cache is the wrapper of event monitoring in the Curator, whose event monitoring can be approximately regarded as a comparison process between the local Cache view and the remote Zookeeper view. At the same time, Curator can automatically handle repeated registration listening for developers, greatly simplifying the tedious process of native API development.

1) Node Cache

public static void nodeCache() throws Exception {
    final String path = "/nodeCache";
    final CuratorFramework client = getClient();
    client.start();
 
    delete(client, path);
    create(client, path, "cache".getBytes());
 
    final NodeCache nodeCache = new NodeCache(client, path);
    nodeCache.start(true);
    nodeCache.getListenable()
            .addListener(() -> System.out.println("node data change, new data is " + new String(nodeCache.getCurrentData().getData())));
 
    setData(client, path, "cache1".getBytes());
    setData(client, path, "cache2".getBytes());
 
    Thread.sleep(1000);
 
    client.close();
}
Copy the code

NodeCache can listen on a specified node. After the listener is registered, the corresponding listener is notified of the node’s change

2) Path Cache

Path Cache is used to monitor the child nodes of the ZNode events, including the added, updateed, removed, and the Path Cache, in conjunction with the steps of the node status, the event will be passed to the registered PathChildrenCacheListener.

public static void pathChildrenCache() throws Exception {
        final String path = "/pathChildrenCache";
        final CuratorFramework client = getClient();
        client.start();
 
        final PathChildrenCache cache = new PathChildrenCache(client, path, true);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
 
        cache.getListenable().addListener((client1, event) -> {
            switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED:" + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED:" + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED:" + event.getData().getPath());
                    break;
                case CONNECTION_LOST:
                    System.out.println("CONNECTION_LOST:" + event.getData().getPath());
                    break;
                case CONNECTION_RECONNECTED:
                    System.out.println("CONNECTION_RECONNECTED:" + event.getData().getPath());
                    break;
                case CONNECTION_SUSPENDED:
                    System.out.println("CONNECTION_SUSPENDED:" + event.getData().getPath());
                    break;
                case INITIALIZED:
                    System.out.println("INITIALIZED:" + event.getData().getPath());
                    break;
                default:
                    break; }}); // client.create().withMode(CreateMode.PERSISTENT).forPath(path); Thread.sleep(1000); client.create().withMode(CreateMode.PERSISTENT).forPath(path +"/c1");
        Thread.sleep(1000);
 
        client.delete().forPath(path + "/c1"); Thread.sleep(1000); client.delete().forPath(path); Thread.sleep(1000) is not notified of changes in the listener node itself; client.close(); }Copy the code

3) the Tree Cache

A combination of the Path Cache and Node Cache monitors create, update, and delete events in a Path and caches data from all child nodes in the Path.

public static void treeCache() throws Exception {
    final String path = "/treeChildrenCache";
    final CuratorFramework client = getClient();
    client.start();
 
    final TreeCache cache = new TreeCache(client, path);
    cache.start();
 
    cache.getListenable().addListener((client1, event) -> {
        switch (event.getType()){
            case NODE_ADDED:
                System.out.println("NODE_ADDED:" + event.getData().getPath());
                break;
            case NODE_REMOVED:
                System.out.println("NODE_REMOVED:" + event.getData().getPath());
                break;
            case NODE_UPDATED:
                System.out.println("NODE_UPDATED:" + event.getData().getPath());
                break;
            case CONNECTION_LOST:
                System.out.println("CONNECTION_LOST:" + event.getData().getPath());
                break;
            case CONNECTION_RECONNECTED:
                System.out.println("CONNECTION_RECONNECTED:" + event.getData().getPath());
                break;
            case CONNECTION_SUSPENDED:
                System.out.println("CONNECTION_SUSPENDED:" + event.getData().getPath());
                break;
            case INITIALIZED:
                System.out.println("INITIALIZED:" + event.getData().getPath());
                break;
            default:
                break; }}); client.create().withMode(CreateMode.PERSISTENT).forPath(path); Thread.sleep(1000); client.create().withMode(CreateMode.PERSISTENT).forPath(path +"/c1");
    Thread.sleep(1000);
 
    setData(client, path, "test".getBytes());
    Thread.sleep(1000);
 
    client.delete().forPath(path + "/c1");
    Thread.sleep(1000);
 
    client.delete().forPath(path);
    Thread.sleep(1000);
 
    client.close();
}
Copy the code

The election

The curator provides two approaches, Leader Latch and Leader Election.

1) Leader Latch

One of the candidates is randomly selected as the leader. After the leader is selected, other candidates cannot become the leader unless close() is called to release the Leadship

public class LeaderLatchTest {
 
    private static final String PATH = "/demo/leader";
 
    public static void main(String[] args) {
        List<LeaderLatch> latchList = new ArrayList<>();
        List<CuratorFramework> clients = new ArrayList<>();
        try {
            for (int i = 0; i < 10; i++) {
                CuratorFramework client = getClient();
                client.start();
                clients.add(client);
 
                final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i);
                leaderLatch.addListener(new LeaderLatchListener() {
                    @Override
                    public void isLeader() {
                        System.out.println(leaderLatch.getId() + ":I am leader. I am doing jobs!");
                    }
 
                    @Override
                    public void notLeader() {
                        System.out.println(leaderLatch.getId() + ":I am not leader. I will do nothing!"); }}); latchList.add(leaderLatch); leaderLatch.start(); } Thread.sleep(1000 * 60); } catch (Exception e) { e.printStackTrace(); } finally {for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
 
            for (LeaderLatch leaderLatch : latchList) {
                CloseableUtils.closeQuietly(leaderLatch);
            }
        }
    }
 
    public static CuratorFramework getClient() {
        return CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181").retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutms (15 * 1000) Default 15 seconds. SessionTimeoutMs (60 * 1000) // Session timeout duration, default 60 seconds. Namespace ("arch"// Set the namespace.build(); }}Copy the code

2) Leader Election

The LeaderSelectorListener allows you to control the leadership and release it when appropriate so that each node has the potential to gain leadership. LeaderLatch, on the other hand, holds the leadership and does not release it unless the close method is called.

public class LeaderSelectorTest {
    private static final String PATH = "/demo/leader";
 
    public static void main(String[] args) {
        List<LeaderSelector> selectors = new ArrayList<>();
        List<CuratorFramework> clients = new ArrayList<>();
        try {
            for (int i = 0; i < 10; i++) {
                CuratorFramework client = getClient();
                client.start();
                clients.add(client);
 
                final String name = "client#" + i;
                LeaderSelector leaderSelector = new LeaderSelector(client, PATH, new LeaderSelectorListenerAdapter() {
                    @Override
                    public void takeLeadership(CuratorFramework client) throws Exception {
                        System.out.println(name + ":I am leader."); Thread.sleep(2000); }}); leaderSelector.autoRequeue(); leaderSelector.start(); selectors.add(leaderSelector); } Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } finally {for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
 
            for (LeaderSelector selector : selectors) {
                CloseableUtils.closeQuietly(selector);
            }
 
        }
    }
 
    public static CuratorFramework getClient() {
        return CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181").retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutms (15 * 1000) Default 15 seconds. SessionTimeoutMs (60 * 1000) // Session timeout duration, default 60 seconds. Namespace ("arch"// Set the namespace.build(); }}Copy the code

A distributed lock

1) Reentrant Lock Shared Reentrant Lock

Shared means that the lock is globally visible and can be requested by any client. Reentrant is similar to the JDK’s ReentrantLock, meaning that the same client can acquire the lock multiple times without blocking. It is implemented by the class InterProcessMutex. Its constructor is:

public InterProcessMutex(CuratorFramework client, String path)
Copy the code

Acquire locks through Acquire and provide timeout mechanisms:

/**
* Acquire the mutex - blocking until it's available. Note: the same thread can call acquire * re-entrantly. Each call to acquire must be balanced by a call to release() */ public void acquire(); /** * Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can
* call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()
* Parameters:
* time - time to wait
* unit - time unit
* Returns:
* true if the mutex was acquired, false if not
*/
public boolean acquire(long time, TimeUnit unit);
Copy the code

The lock is released by the release() method. InterProcessMutex instances can be reused. Revoking ZooKeeper Recipes Wiki defines a negotiated undo mechanism. To undo mutex, call the following method:

/** * make the lock revocable. The Listener is called when another process or thread wants you to release the lock. * Parameters: * listener - the listener */ public void makeRevocable(RevocationListener<T> listener)Copy the code

2) The Shared Lock cannot be reentered

Use InterProcessSemaphoreMutex, call methods are similar, the difference is that the lock is non-reentrant, in the same thread non-reentrant

3) Shared Reentrant Read Write Lock

Similar to the JDK’s ReentrantReadWriteLock. A read-write lock manages a pair of related locks. One is responsible for reading and the other for writing. The read operation can be used by multiple processes simultaneously when the write lock is not in use, while the write lock is not allowed to read (block). This lock is reentrant. A thread with a write lock can re-enter the read lock, but the read lock cannot enter the write lock. This also means that write locks can be degraded to read locks, such as request write locks -> read locks -> release write locks. Upgrading from a read lock to a write lock is not possible. It is mainly implemented by two classes:

InterProcessReadWriteLock
InterProcessLock
Copy the code

4) Shared Semaphore

A counting Semaphore is similar to JDK Semaphore. Semaphore in the JDK maintains a set of permitts, which are called leases in Cubator. Note that all instances must use the same numberOfLeases value. Calling Acquire returns a lease object. The client must close these lease objects in finally, or the leases will be lost. However, if a client session is lost for some reason, such as a crash, then the leases held by these clients are automatically closed so that other clients can continue to use them. Leases can also be returned in the following ways:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)
Copy the code

Note that you can request more than one lease at a time, and if Semaphore’s current lease is insufficient, the request thread will block. Overloading methods for timeout are also provided:

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
Copy the code

The main categories are:

InterProcessSemaphoreV2
Lease
SharedCountReader
Copy the code

5) Multi Shared Lock

Multi Shared Lock is a container for locks. When acquire is called, all acquire locks are acquired, and if the request fails, all locks are released. All locks are also released when release is called (failures are ignored). Basically, it is a representation of a group lock, on which release requests are passed to all the locks it contains. There are two main classes involved:

InterProcessMultiLock
InterProcessLock
Copy the code

Its constructor needs a collection of contained locks, or a set of ZooKeeper paths.

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)
Copy the code

The barrier fence

1) The barrierPath parameter in the DistributedBarrier constructor is used to identify a barrier, which is the same barrier if the barrierPath parameter is the same. Generally, the use of a fence is as follows: 1. Set a fence on the client. 2. Other clients call waitOnBarrier() to wait for the barrier to be removed, and the program handles thread blocking 3. The main client removes the fence, and other client handlers continue running at the same time. The main methods of the DistributedBarrier class are as follows: setBarrier() – setBarrier waitOnBarrier() – wait for barrier removeBarrier() – removeBarrier

A Double Barrier allows clients to synchronize the start and end of calculations. When enough processes join the double fence, the process begins to calculate, and when the calculation is complete, it leaves the fence. The double barrier class is a DistributedDoubleBarrier class that implements a double barrier. Its constructor is as follows:

// client - the client
// barrierPath - path to use
// memberQty - the number of members in the barrier
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)
Copy the code

MemberQty is the number of members, and when the Enter method is called, the member is blocked until all members have called Enter. When the leave method is called, it also blocks the calling thread until all members have called leave. Note: The value of parameter memberQty is only a threshold value, not a limit value. When the number of waiting fences is greater than or equal to this value, the fences will open! Like a DistributedBarrier, the barrierPath parameter of a double barrier is used to determine if it is the same barrier. Double barriers are used as follows: 1. Create a DistributedDoubleBarrier from multiple clients on the same path, then call the enter() method and wait for the number of bars to reach memberQty to enter the bar. 2. When the number of fences reaches memberQty, multiple clients stop blocking and continue to run at the same time until the leave() method is executed, and wait for the number of memberQty fences to block in the leave() method at the same time. 3. A number of memberQty fences block the leave() method at the same time, and the leave() method of multiple clients stops blocking and continues to run. The main methods of the DistributedDoubleBarrier class are as follows: Enter (), enter(long maxWait, TimeUnit unit) – Wait to enter the fence leave(), leave(long maxWait, TimeUnit unit) – Wait to leave the fence The DistributedDoubleBarrier monitors the state of the connection, and the Enter () and Leave methods throw exceptions when the connection breaks.

Counter Counters

ZooKeeper allows you to implement a cluster shared counter. Just use the same path to get the latest counter value, which is guaranteed by the consistency of ZooKeeper. Curator has two counters, one for int and one for long.

1) SharedCount

This class uses int to count. There are three main classes involved.

* SharedCount
* SharedCountReader
* SharedCountListener
Copy the code

SharedCount represents a counter, and you can add a SharedCountListener to it. When the counter changes, this Listener can listen for the changed event, and SharedCountReader can read the latest value. Includes literals and VersionedValue values with version information.

2) DistributedAtomicLong

In addition to counting in a larger range than SharedCount, it first tries to set the count using an optimistic lock, and if that doesn’t work (for example, if the count has been updated by another client in the meantime), it uses InterProcessMutex to update the count. This counter has a series of operations:

  • Get (): Gets the current value
  • Increment (), plus one
  • Decrement () : minus one
  • Add () : Adds a specific value
  • Subtract (): Subtract a specific value
  • TrySet (): Tries to set the count
  • ForceSet (): Forces the count

You must check that the returned result succeeded(), which indicates whether the operation succeeded. If the operation succeeds, preValue() represents the value before the operation and postValue() represents the value after the operation.

End

Curator abstracts and simplifies many complex ZooKeeper operations and is recommended to be developed instead of the ZKClient package.