Apache Curator is a relatively perfect ZooKeeper client framework, which simplifies zooKeeper operation through a set of advanced APIS encapsulated. Therefore, Apache Curator is used to operate ZooKeeper in practical applications.

Source: github.com/chentianmin…

A review of official documents shows that Curator addresses three main problems:

  • encapsulationZooKeeper clientwithZooKeeper serverConnection processing between.
  • A Fluent style operation API is provided.
  • Provide abstract encapsulation of Various ZooKeeper application scenarios (recipe, distributed lock service, cluster leader election, shared counters, caching mechanism, distributed queue, etc.).

Introduction of depend on

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>
Copy the code

Create a session

Create the client using static engineering methods

// Retry policy
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000.3);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionStr,5000.5000, retry);
Copy the code

The newClient static factory method contains four main arguments:

Parameter names instructions
connectionString The server list, format host1: port1, host2: port2,…
retryPolicy RetryPolicy. There are four retry policies built in. RetryPolicy interface can also be implemented
sessionTimeoutMs Session timeout duration, expressed in milliseconds. The default value is 60000ms
connectionTimeoutMs Connection creation timeout, in milliseconds. The default value is 60000ms

Create the client using the Fluent style Api


RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000.3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectionInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .build();

Copy the code

Create a client that contains an isolated namespace

To isolate different Zookeeper services, you need to allocate an independent NameSpace (NameSpace) for each service, that is, specify the root path of Zookeeper (official term: add “Chroot” to Zookeeper). For example, if the client specifies the independent namespace /base, the client performs operations on the data nodes on Zookeeper based on this directory. By setting Chroot, the client application can be mapped to a subtree on the Zookeeper server. In the scenario where multiple applications share the same Zookeeper cluster, this helps isolate different applications from each other.

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000.3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();
Copy the code

When the client is successfully created, call the start() method directly to start the client.

Data node operation

Creating a Data Node

Zookeeper node creation mode:

  • PERSISTENT: PERSISTENT nodes
  • PERSISTENT_SEQUENTIAL: Persistent sequential node
  • EPHEMERAL: the temporary node
  • EPHEMERAL_SEQUENTIAL: Temporary sequential node

Create a node with initial empty content

client.create().forPath("/path");
Copy the code

If no node properties are set, the node creation mode defaults to persistent node and the content defaults to empty.

Create a node with initialization

client.create().forPath("/path2"."test1".getBytes());
Copy the code

Creates a node, specifying creation mode (temporary node), with nothing

client.create().withMode(CreateMode.EPHEMERAL).forPath("/path3");
Copy the code

Create a node, specifying the creation mode (temporary node), along with initialization

client.create().withMode(CreateMode.EPHEMERAL).forPath("/path"."demo".getBytes());
Copy the code

Create a node, specify creation mode (temporary node), with initialization content, and automatically recursively create the parent node

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("/abc/path"."init".getBytes());
Copy the code

CreatingParentContainersIfNeeded () interface is very useful, because generally the developer is to create a child node must judge the existence of its parent node and if there is no direct to create throws NoNodeException, Use the creatingParentContainersIfNeeded () after the Curator automatically recursive create all the required parent node.

Deleting a Data Node

Deleting a node

 client.delete().forPath("/path");
Copy the code

Note that this method can only remove the leaf node, otherwise an exception will be thrown.

Delete a node and recursively delete all of its children

client.delete()
                .deletingChildrenIfNeeded()
                .forPath("/path");
Copy the code

Deletes a node to force the specified version to be deleted

client.delete()
      .withVersion(100)
      .forPath("/path");

Copy the code

Deleting a node is mandatory

client.delete()
      .guaranteed()
      .forPath("/path");
Copy the code

The above multiple streaming interfaces can be combined freely, for example:

client.delete()
      .guaranteed()
      .deletingChildrenIfNeeded()
      .withVersion(10086)
      .forPath("path");
Copy the code

Read data node data

Reads the data content of a node


byte[] data = client.getData().forPath("/path2");

Copy the code

Read the data content of a node and obtain the stat of the node

Stat stat = new Stat();
byte[] data = client.getData()
        .storingStatIn(stat)
        .forPath("/path2");
Copy the code

Update data on data nodes

Updates the data content of a node

client.setData().forPath("/path"."data".getBytes());
Copy the code

Updates the data content of a node to force the specified version to be updated

client.setData().withVersion(10086).forPath("/path"."data".getBytes());
Copy the code

Check whether the node exists

Stat stat = client.checkExists().forPath("/path");
Copy the code

Note: This method returns a Stat instance, null if it does not exist.

Gets all child node paths of a node

List<String> list = client.getChildren().forPath("/path");
Copy the code

This method returns a List to get a List of ZNode’s children, Path.

The transaction

Instances of the CuratorFramework contain the inTransaction() interface method, which is called to start a ZooKeeper transaction. You can compound the create, setData, check, and/or delete operations and call commit() as an atomic operation. Here’s an example:

 client.inTransaction()
                .check().forPath("/path")
                .and()
                .create().withMode(CreateMode.EPHEMERAL).forPath("/path"."data".getBytes())
                .and()
                .setData().withVersion(1000).forPath("/path"."data2".getBytes())
                .and()
                .commit();
Copy the code

The methods of create, delete, update and read mentioned above are synchronous. The asynchronous interface is provided by introducing the BackgroundCallback interface to process the result information returned by the server after the invocation of the asynchronous interface. An important callback value in the BackgroundCallback interface is CuratorEvent, which contains details about the event type, response, and node.

CuratorEventType

The event type Methods corresponding to instances of the CuratorFramework
CREATE create()
DELETE delete()
EXISTS checkExists()
GET_DATA getData()
SET_DATA setData()
CHILDREN getChildren()
SYNC sync(String,Object)
GET_ACL getACL()
SET_ACL setACL()
WATCHED Watcher(Watcher)
CLOSING close()

The response code (getResultCode())

The response code meaning
0 OK, that is, the call succeeded
4 – ConnectionLoss: The client is disconnected from the server
– 110. NodeExists: a NodeExists
– 112. SessionExpired: the session expires
ExecutorService executor = Executors.newFixedThreadPool(2);
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                / / asynchronous
// .inBackground()
                .inBackground((client1, event) -> {
                    // Event type
                    CuratorEventType type = event.getType();
                    // Result encoding
                    int resultCode = event.getResultCode();
                    System.out.println("Event Type:" + type + ", result code:" + resultCode);
                }, executor)
                .forPath("/path");
Copy the code

Note: If the inBackground() method does not specify executor, the EventThread for asynchronous processing is used by default.

The cache

It is highly recommended to use ConnectionStateListener to monitor the state of a connection, when the state of a connection is LOST, all apis under co-recipes will expire or expire.

Zookeeper native supports event listening by registering Watcher, but developers need to register it repeatedly (Watcher can only be registered once and used once). Cache is a wrapper around event listening in the exhibit and can be seen as a local Cache view of event listening that automatically handles repeated registrations for developers. Toth provides three types of Watcher(Cache) to listen for node changes.

  • Path Cache

The Path Cache is used to monitor the children of a Znode. When a child node add, update, and delete, the Path Cache will change its state, contains the child nodes of the latest, child node data and status, and the changes in the state by PathChildrenCacheListener notice.

In practice, four classes are involved:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

Create a Path Cache with the following constructor:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);
Copy the code

To use the cache, you must call its start method and then call its close method. StartMode can be set to enable the startup mode.

  • NORMAL: NORMAL initialization.
  • BUILD_INITIAL_CACHE: Rebuild () is called before start() is called.
  • POST_INITIALIZED_EVENT: sends one data after the Cache initializes itPathChildrenCacheEvent.Type#INITIALIZEDThe event

AddListener (PathChildrenCacheListener listener) can increase the listener to monitor changes in the cache. The getCurrentData() method returns a List object that iterates through all of the child nodes.

Set/update/remove is done using the client (CuratorFramework), not the PathChildrenCache operation.

// Create path cache
/** * Note: If the cacheData parameter in new PathChildrenCache(Client, PATH, cacheData) is set to false, * then event.getData() in the example will return null and the cache will not cache the node data. * /
PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
cache.start(PathChildrenCache.StartMode.NORMAL);

// Add a listener
cache.getListenable()
        .addListener((client1, event) -> {
            // Event type
            PathChildrenCacheEvent.Type type = event.getType();
            System.out.println("Event Type:" + type);
            // Child node data
            ChildData data = event.getData();
            System.out.println("Child node data:" + data);
        });
Copy the code

Note: If the cacheData parameter in new PathChildrenCache(client, PATH, cacheData) is set to false, event.getData() in the example will return null and the cache will not cache the node data.

Node Cache

A Node Cache is similar to a Path Cache in that it listens only for a specific Node. It involves the following three classes:

  • NodeCache: Node Cache implementation class
  • NodeCacheListener: node listener
  • ChildData: Node data

GetCurrentData () will get the current state of the node, from which you can get the current value.

/ / create NodeCache
NodeCache nodeCache = new NodeCache(client, PATH);
// Start is required
nodeCache.start();

NodeCacheListener listener = () -> {
    System.out.println("Node change");
    ChildData currentData = nodeCache.getCurrentData();
    if(currentData ! =null) {
        System.out.println("Node data:" + currentData);
    }else {
        System.out.println("Node has no data"); }}; nodeCache.getListenable().addListener(listener);Copy the code

Tree Cache

The Tree Cache monitors all nodes in the entire Tree, mainly involving the following four classes:

  • TreeCache: Tree Cache implementation class
  • TreeCacheListener: listener class
  • TreeCacheEvent: Triggered event class
  • ChildData: Node data
TreeCache treeCache = new TreeCache(client, PATH);
treeCache.start();

treeCache.getListenable().addListener((client1, event) -> {
    TreeCacheEvent.Type type = event.getType();
    ChildData data = event.getData();
    System.out.println("Event Type:" + type + ", node data: + data);
});
Copy the code

Leader election

Leader elections are an important function in distributed computing. The election process looks like this: a process is assigned as organizer to distribute tasks to each node. Before a task starts, neither node knows who is the leader or coordinator. When the election algorithm starts to execute, each node eventually gets a unique node as the task leader. In addition, elections often occur when the leader breaks down unexpectedly, and a new leader has to be elected.

In a ZooKeeper cluster, the leader performs write operations and synchronizes the followers using the Zab protocol. Both the leader and followers can perform read operations. The Curator has two recipes for the leader election, LeaderSelector and LeaderLatch.

  • LeaderSelector: All surviving clients take turns as the Leader.
  • LeaderLatch: Once the Leader is elected, the leadership will not be relinquished unless a client hangs up and re-triggers the election.

LeaderLatch

The LeaderLatch negotiates with other LeaderLatches that use the same Latch path, and one of them is eventually elected leader. You can check whether the LeaderLatch instance is the leader by using the hasLeadership() method.

Similar to CountDownLatch in the JDK, LeaderLatch blocks when a request becomes leadership and must call the Close () method once LeaderLatch is no longer used. If it is the leader, leadership is released, and the other participants elect a leader.

Exception handling: The LeaderLatch instance can add a ConnectionStateListener to listen for network connection problems. When SUSPENDED or LOST, the leader no longer considers himself the leader. When RECONNECTED after a LOST connection, the LeaderLatch will delete the previous ZNode and create a new one. LeaderLatch users must consider connection issues that lead to leadership loss. It is highly recommended that you use a ConnectionStateListener.

/** * A sequential node will be created for all nodes participating in the election. The smallest node will be set as the master node. The nodes that fail to capture the Leader will monitor the deletion event of the previous node. When the master node manually calls the close() method or the master node hangs, subsequent children preempt the master. Spark uses this approach@authorChen tian Ming *@date2018/12/14 * /
public class LeaderLatchTest {

    protected static String PATH = "/francis/leader";
    private final static String connectionInfo = "127.0.0.1:2181";
    private static final int CLIENT_QTY = 10;

    public static void main(String[] args) {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();

        try {

            /** * Create 10 clients and create the corresponding LeaderLatch */
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000.3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);

                latch.addListener(new LeaderLatchListener(){
                    @Override
                    public void isLeader(a) {
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader(a) {
                        System.out.println("I am not Leader"); }}); examples.add(latch); client.start(); latch.start(); } Thread.sleep(10 _000);
            /** * gets the current leader */
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            // The current leadership can only be released through CLOSE
            currentLeader.close();

            Thread.sleep(5000);

            /** * get leader */ again
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            for (LeaderLatch latch : examples) {
                if (null! = latch.getState()) CloseableUtils.closeQuietly(latch); }for(CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); }}}}Copy the code

First we created 10 Leaderlatches, one of which would be elected leader after startup. Because the election will take some time, the leader cannot be obtained immediately after start. HasLeadership () checks to see if you are the leader, and returns true if so. You can use getId() to get the ID of the current leader. The current leadership can only be released by close().

LeaderSelector

LeaderSelector is used with the following classes:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

Similar LeaderLatch LeaderSelector must start: LeaderSelector. Start (); Once started, your listener’s takeLeadership() method is called when the instance gets leadership. The takeLeadership() method returns only when the leadership is released. When you are no longer using the LeaderSelector instance, you should call its close() method.

Exception Handling: The LeaderSelectorListener class inherits ConnectionStateListener. LeaderSelector must be careful of connection state changes. If the instance becomes the leader, it should respond to SUSPENDED or LOST. When a SUSPENDED state occurs, the instance must assume that it may no longer be the leader until the reconnection is successful. If the LOST state is present and the instance is no longer the Leader, the takeLeadership() method returns.

Important: the recommended treatment is thrown when received SUSPENDED or LOST CancelLeadershipException anomalies. This is an exception that causes the LeaderSelector instance to abort and cancel execution of the takeLeadership method. This is very important, you must consider expanding LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter provides the recommended processing logic.

/** * After the instance is selected as the leader, the takeLeadership method will be called for business logic processing, and the leadership will be released upon completion of the processing. * The invocation of the autoRequeue() method ensures that this instance may gain leadership after it has released it. * This ensures that each node can assume leadership. * *@authorChen tian Ming *@date2019/11/10 * /
public class LeaderSelectorTest {

    public static void main(String[] args) throws InterruptedException {
        List<LeaderSelector> leaderSelectors = new ArrayList<>();
        List<CuratorFramework> clients = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181".new ExponentialBackoffRetry(1000.3));
            client.start();
            clients.add(client);

            LeaderSelector leaderSelector = new LeaderSelector(client, "/master".new LeaderSelectorListenerAdapter() {
                @Override
                public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                    System.out.println(Thread.currentThread().getName() + " is a leader");
                    Thread.sleep(Integer.MAX_VALUE);
                }

                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {
                    super.stateChanged(client, newState); }}); leaderSelectors.add(leaderSelector); } leaderSelectors.forEach(leaderSelector -> { leaderSelector.autoRequeue(); leaderSelector.start();try {
                Thread.sleep(1000);
            } catch(InterruptedException e) { e.printStackTrace(); }}); System.out.println("= = = = = = = = = = = = = = = = = =");
        clients.forEach(client -> {
            client.close();
            try {
                Thread.sleep(1000);
            } catch(InterruptedException e) { e.printStackTrace(); }}); Thread.sleep(100 * 1000); }}Copy the code

A distributed lock

  1. It is recommended to useConnectionStateListenerMonitor the status of the connection because you no longer have the lock when you connect to LOST.
  2. Distributed lock global synchronization means that no two clients have the same lock at any one point in time.

Reentrant Shared Lock – Shared Reentrant Lock

Shared means that the lock is globally visible and can be requested by any client. Reentrant is similar to the JDK’s ReentrantLock in that it can be Reentrant. This means that the same client can obtain the lock multiple times without blocking. It is implemented by the class InterProcessMutex. Its constructor is: public InterProcessMutex(CuratorFramework client, String path).

  • throughacquire()Acquire the lock and provide a timeout mechanism.
  • throughrelease()Method to release the lock.InterProcessMutexInstances can be reused.

Zookeeper also provides a negotiated undo mechanism to support the undo of MUtex by setting an undo listener for mutex, which is implemented by calling makeRevocable(RevocationListener

Listener).

If you ask to revoke the current lock, call revoker.attemptrevoke (CuratorFramework Client, String Path), and the RevocationListener will be called back.

Code examples:

First let’s create a mock shared resource that is expected to be accessible only by a single client, otherwise there will be concurrency issues.

/** * share resources */
public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);
    private final Random random = new Random(System.currentTimeMillis());

    The /** * use method can have at most one client call * otherwise, throws an exception *@throws InterruptedException
     */
    public void use(a) throws InterruptedException {
        // In the real world we would access/maintain a shared resource here
        // This example does not allow illegal concurrent IllegalStateException in the case of locks
        // However, it is easy to throw an exception in the case of no lock because it has been sleeping for a while
        if(! inUse.compareAndSet(false.true)) {
            throw new IllegalStateException("Only one client can access it at a time!!");
        }
        try {
            Thread.sleep(random.nextInt(2 _000));
        } finally {
            inUse.set(false); }}}Copy the code

Then create an InterProcessMutexDemo class, which is responsible for requesting the lock, using the resource, and releasing the lock.

/** ** Then create an InterProcessMutexDemo class, which is responsible for requesting the lock, using the resource, and releasing the lock. *@authorChen tian Ming *@date2018/12/15 * /

public class InterProcessMutexDemo {


    /** * the code is also very simple, generating 10 clients, each client repeats the lock request - access resource - release process 10 times. * Each client is in a separate thread. As a result, the lock is used randomly and exclusively by each instance. * /

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
        // Make the lock revocable. The Listener is called when another process or thread wants you to release the lock.
        lock.makeRevocable((forLock -> {

        }));
    }


    /** * The same thread acquire again, first check whether the current mapping table (threadData) has the lock information of the thread, if so, atom +1, then return * reentrant mutex * lock execution *@param time
     * @param unit
     * @throws Exception
     */
    public void doWork(long time, TimeUnit unit) throws Exception {
        if(! lock.acquire(time, unit)) {throw new IllegalStateException(clientName + "Obtaining the mutex lock failed");
        }
        try {
            System.out.println(clientName + "Got the mutex");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + "Release the mutex");
            lock.release(); // always release the lock in a finally block}}private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = () -> {
                    // Create a client
                    CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000.3));
                    try {
                        / / start
                        client.start();
                        final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                        / / execution
                        for (int j = 0; j < REPETITIONS; ++j) {
                            example.doWork(10, TimeUnit.SECONDS); }}catch (Throwable e) {
                        e.printStackTrace();
                    } finally {
                        CloseableUtils.closeQuietly(client);
                    }
                    return null;
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally{ CloseableUtils.closeQuietly(server); }}}Copy the code

Not reentrant Shared Lock – Shared Lock

This lock lacks Reentrant functionality compared to InterProcessMutex above, which means it cannot be Reentrant in the same client. This class is InterProcessSemaphoreMutex, using method and InterProcessMutex similar

See source InterProcessSemaphoreMutexDemo class.

Acquire (); acquire(); acquire(); acquire(); Acquire (); acquire(); acquire(); acquire(); acquire(); All other clients block on the first acquire() method timeout and throw an exception. It verified the InterProcessSemaphoreMutex implementation is non-reentrant lock.

Reentrant Read Write Lock – Shared Reentrant Read Write Lock

Similar to the JDK’s ReentrantReadWriteLock, one read-write lock is used for read operations and the other for write operations. Read operations can be used by multiple processes when the write lock is not in use, while the write lock does not allow reads (blocking) while in use. This lock is reentrant. A thread with a write lock can re-enter the read lock, but the read lock cannot enter the write lock. This also means that write locks can be downgraded to read locks, such as request write locks -> Request read locks -> release read locks —-> release write locks. Upgrading from a read lock to a write lock is not possible. Re-entrant read-write lock is mainly composed of two classes: InterProcessReadWriteLock, InterProcessMutex. When you first create a InterProcessReadWriteLock instance, and then according to your requirements are to read or write locks, lock type is InterProcessMutex read-write lock.

Semaphore — Shared Semaphore

A counting Semaphore is similar to JDK Semaphore. A set of permitts maintained by Semaphore in the JDK, which is called a Lease in The Exhibit. There are two ways to determine the maximum number of semaphore leases. The first way is for the user to give a path and specify the maximum LeaseSize. The second way is for the user to give a path and use the SharedCountReader class. If you do not use SharedCountReader, you must ensure that all instances use the same (maximum) number of leases across multiple processes. Otherwise, you may have instances in process A holding A maximum of 10 leases, but instances in process B holding A maximum of 20 leases, and the meaning of the lease becomes invalid.

Calling Acquire () returns a lease object. The client must close these lease objects in finally, or the leases will be lost. However, if a client session is lost for some reason, such as a crash, then the leases held by these clients are automatically closed so that other clients can continue to use them. Leases can also be returned in the following ways:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)
Copy the code

Note that you can request multiple leases at once, and if Semaphore’s current leases are insufficient, the request thread will block. Overloading methods for timeout are also provided.

public Lease acquire(a)
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
Copy the code

The main classes Shared Semaphore uses include InterProcessSemaphoreV2, Lease, and SharedCountReader. Example code for use is as follows:

public class InterProcessSemaphoreDemo {


    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    /** * First we got 5 leases and finally we gave it back to Semaphore. A lease is then requested, * since Semaphore has 5 leases remaining, the request can be satisfied and one lease is returned, leaving 4 leases remaining. If there are not enough leases, block until time out. If there are not enough leases, null will be returned. If the timeout is not set, it blocks consistently. * All the locks mentioned above are fair locks. From the overall ZooKeeper perspective, each client obtains the lock in the order requested, and there is no unfair preemption. *@param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            // Create a client and start it
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000.3));
            client.start();

            / / semaphore
            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);

            // Get 5 leases
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");

            // Get a lease
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            // Process resources
            resource.use();

            // Get 5 leases, because there are only 10 at most, the previous 6 have been used, lease is not enough.
            // If time expires before all leases are acquired, the acquired subset of leases is automatically closed
            Collection<Lease> leases2 = semaphore.acquire(5.10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            // Return a lease
            System.out.println("return one lease");
            semaphore.returnLease(lease);

            // Return multiple leases
            System.out.println("return another 5 leases"); semaphore.returnAll(leases); }}}Copy the code

Multiple Shared Lock objects – Multi Shared Lock

Multi Shared Lock is a container for locks. When acquire() is called, all locks are acquired (), and if the request fails, all locks are released. All locks are also released when release is called (failures are ignored). Basically, it is a representation of a group lock, on which release requests are passed to all the locks it contains. There are two classes involved: InterProcessMultiLock and InterProcessLock. Its constructor needs a collection of contained locks, or a set of ZooKeeper paths.

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)
Copy the code

Shared Shared

Share count – SharedCount

This class uses int to count. There are three main classes involved. SharedCount, SharedCountReader, SharedCountListener. SharedCount represents a counter, and you can add a SharedCountListener to it. When the counter changes, this Listener can listen for the changed event, and SharedCountReader can read the latest value. Includes literals and VersionedValue values with version information.

Distributed atomic class – Atomic

DistributedAtomicLong

As the name suggests, counters are for counting, and ZooKeeper allows you to implement a cluster shared counter. Just use the same path to get the latest counter value, which is guaranteed by the consistency of ZooKeeper. In addition to counting in a larger range than SharedCount, it first tries to set the count using an optimistic lock, and if that doesn’t work (for example, if the count has been updated by another client in the meantime), it uses InterProcessMutex to update the count.

This counter has a series of operations:

  • get(): Gets the current value
  • increment()A:
  • decrement(): minus one
  • add(): Increases a specific value
  • subtract(): Subtracts a specific value
  • trySet(): Tries to set the count
  • forceSet(): Forces the count to be set

Distributed barriers – Barriers

A distributed Barrier is a class that blocks waiting processes on all nodes until one is satisfied, and then all nodes continue. Like in a horse race, waiting for the horses to come to the starting line. All the horses galloped out at the first word of command.

DistributedBarrier

The DistributedBarrier class implements the functionality of a fence. Its constructor is public DistributedBarrier(CuratorFramework client, String barrierPath).

  1. First you need to set up the fence, which will block the threads waiting on it:setBarrier().
  2. The blocking thread then calls the method to wait for the release condition: waitOnBarrier().
  3. When the condition is met, the fence is removed and all waiting threads continue to execute:removeBarrier().

Exception handling DistributedBarrier monitors the state of the connection, and the waitOnBarrier() method throws an exception when the connection breaks.

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";


    /** * This example creates controlbarriers to set and remove barriers. We created five threads to wait on this Barrier. * Finally remove the fence before all threads continue to execute. *@param args
     * @throws Exception
     */

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000.3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);

            / / create a barrier
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            / / set the barrier
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            / / release
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000); }}}Copy the code

The double fence – DistributedDoubleBarrier

A double fence allows clients to synchronize the start and end of calculations. When enough processes join the double fence, the process begins to calculate, and when the calculation is complete, it leaves the fence. The double barrier class is DistributedDoubleBarrier. The constructor is:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Copy the code

MemberQty is the number of members, and when the Enter () method is called, the members are blocked until all members have called Enter (). When the leave() method is called, it also blocks the calling thread until all members have called leave(). It is just like the 100-meter race, the starting gun is fired, all the athletes start to run, and all the athletes run over the finish line, the race is over.

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000.3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {

                // Create a double fence
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES); Thread.sleep(Integer.MAX_VALUE); }}}Copy the code

Welcome to my open source project: a lightweight HTTP invocation framework for SpringBoot