background
Just recently, ZooKeeper may be used in the project, so I sorted out the previous ZK materials, and everyone is interested in taking a look.
Introduction to Basic Concepts
ZooKeeper is a distributed, open source distributed application collaboration service. ZooKeeper is designed to encapsulate complex and error-prone distributed consistency services into an efficient and reliable set of primitives that can be delivered to users in a series of easy-to-use interfaces. Hereinafter referred to as ZK.
Typical Application Scenarios
-
Configuration management, similar to a database
-
DNS service
-
Group member Management
-
A distributed lock
As ZK data are stored in memory, the data volume is mostly maintained in hundreds of megabytes, and the database data of tens of GB is also common.
ZK cluster setup
Docker-compose is strongly recommended for development and testing, which is very convenient. Production requires multiple independent machines for cluster implementation, otherwise there would be no need to use ZK.
version: '3.1'
services:
zoo1:
image: zookeeper
restart: always
hostname: zoo1
ports:
- 2181: 2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: Server. 1 = 0.0.0.0:2888-3888; 2181 server.2=zoo2:2888:3888; 2181 server.3=zoo3:2888:3888; 2181
zoo2:
image: zookeeper
restart: always
hostname: zoo2
ports:
- 2182: 2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888; 2181 Server. 2 = 0.0.0.0:2888-3888; 2181 server.3=zoo3:2888:3888; 2181
zoo3:
image: zookeeper
restart: always
hostname: zoo3
ports:
- 2183: 2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888; 2181 server.2=zoo2:2888:3888; 2181 Server. 3 = 0.0.0.0:2888-3888; 2181
Copy the code
ZK interaction based on Java
Rely on the library
dependencies {
implementation "Org. Apache. They are: they are: 2.6.2." "
implementation "Org. Apache. Curator: curator - recipes: 4.2.0"
implementation "Org. Apache. Curator: curator - x - discovery: 4.2.0"
implementation "Org. Apache. Curator: curator - x - discovery - server: 4.2.0"
testImplementation "Junit: junit: 4.12"
testImplementation "com.google.truth:truth:1.0"
}
Copy the code
The original API
Basic test cases
package org.yao;
import com.google.common.collect.ImmutableList;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import static com.google.common.truth.Truth.assertThat;
public class ZooKeeperTests {
private String pathPrefix = "/multi";
private ZooKeeper zk;
private CountDownLatch startLatch;
private CountDownLatch closeLatch;
private AsyncCallback.MultiCallback callback;
private String path1 = pathPrefix + "1";
private String path2 = pathPrefix + "2";
private byte[] data1 = {0x1};
private byte[] data2 = {0x2};
@Before
public void setUp(a) throws Exception {
// Register the callback, and after the callback, stop waiting
startLatch = new CountDownLatch(1);
callback =
(int rc, String path, Object ctx, List<OpResult> opResults) -> {
assertThat(rc).isEqualTo(KeeperException.Code.OK.intValue());
System.out.printf("delete multi executed");
closeLatch.countDown();
};
zk = new ZooKeeper("localhost".2181.new DefaultWatcher());
startLatch.await();
}
@After
public void tearDown(a) throws Exception {
// Clean up zk links
closeLatch.await();
zk.close();
}
@Test
public void testMulti(a) throws Exception {
closeLatch = new CountDownLatch(1);
// Create two znodes
Op createOp1 = Op.create(path1, data1, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Op createOp2 = Op.create(path2, data2, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Synchronous API
zk.multi(ImmutableList.of(createOp1, createOp2));
System.out.println("create multi executed");
assertThat(zk.getData(path1, false.null)).isEqualTo(data1);
assertThat(zk.getData(path2, false.null)).isEqualTo(data2);
// Delete two znodes
Op deleteOp1 = Op.delete(path1, -1);
Op deleteOp2 = Op.delete(path2, -1);
// Asynchronous API
zk.multi(ImmutableList.of(deleteOp1, deleteOp2), callback, null);
}
@Test
public void testTransaction(a) throws Exception {
closeLatch = new CountDownLatch(1);
// Create two znodes
Transaction tx = zk.transaction();
tx.create(path1, data1, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
tx.create(path2, data2, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Synchronous API
tx.commit();
System.out.println("transaction committed");
assertThat(zk.getData(path1, false.null)).isEqualTo(data1);
assertThat(zk.getData(path2, false.null)).isEqualTo(data2);
// Delete two znodes
tx = zk.transaction();
tx.delete(path1, -1);
tx.delete(path2, -1);
// Asynchronous API
tx.commit(callback, null);
}
@Test
public void testTransactionWithCheck(a) throws Exception {
closeLatch = new CountDownLatch(0);
{
Transaction tx = zk.transaction();
tx.create(path1, data1, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
tx.create(path2, data2, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
tx.check(path1, 0);
tx.check(path2, 0);
tx.commit();
}
{
Transaction tx = zk.transaction();
tx.check(path1, 0);
tx.check(path2, 0);
tx.delete(path1, 0);
tx.delete(path2, 0); tx.commit(); }}/** * getChildren does not list descendants recursively. */
@Test
public void testGetChilren(a) throws Exception {
closeLatch = new CountDownLatch(0);
List<String> paths = zk.getChildren("/a".false);
System.out.printf("child paths: %s\n", paths);
}
class DefaultWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None
&& event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("zookeeper client connected"); startLatch.countDown(); }}}}Copy the code
Curator of the API
Curator introduce
Curator is a Java client library for ZooKeeper. Curator aims to simplify the use of ZK. Code that does not use a co-curator handles its own ConnectionLossException. And provides a lock, service discovery encapsulation of the more complete implementation, to avoid duplication of wheel.
The test case
package org.yao;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import static com.google.common.truth.Truth.assertThat;
/** * Example code to demonstrate the usage of Curator client and framework. */
public class CuratorTests {
private CuratorFramework client;
private String connectString = "localhost:2181";
private RetryPolicy retryPolicy;
@Before
public void setUp(a) {
retryPolicy = new ExponentialBackoffRetry(1000.3);
client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
/* // Fluent style client = CuratorFrameworkFactory.builder() .connectString(connectString) .retryPolicy(retryPolicy) .build(); * /
// Start client
client.start();
}
@After
public void tearDown(a) {
client.close();
}
// create -> getData -> delete in synchronous mode
@Test
public void testSyncOp(a) throws Exception {
String path = "/one";
byte[] data = {'1'};
client.create().withMode(CreateMode.PERSISTENT).forPath(path, data);
byte[] actualData = client.getData().forPath(path);
assertThat(data).isEqualTo(actualData);
client.delete().forPath(path);
client.close();
}
// create -> getData -> delete in asynchronous mode
@Test
public void testAsyncOp(a) throws Exception {
String path = "/two";
final byte[] data = {'2'};
final CountDownLatch latch = new CountDownLatch(1);
// Use listener only for callbacks
client
.getCuratorListenable()
.addListener(
(CuratorFramework c, CuratorEvent event) -> {
switch (event.getType()) {
case CREATE:
System.out.printf("znode '%s' created\n", event.getPath());
// 2. getData
c.getData().inBackground().forPath(event.getPath());
break;
case GET_DATA:
System.out.printf("got the data of znode '%s'\n", event.getPath());
assertThat(event.getData()).isEqualTo(data);
// 3. Delete
c.delete().inBackground().forPath(path);
break;
case DELETE:
System.out.printf("znode '%s' deleted\n", event.getPath());
latch.countDown();
break; }});// 1. create
client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath(path, data);
latch.await();
client.close();
}
@Test
public void testWatch(a) throws Exception {
String path = "/three";
byte[] data = {'3'};
byte[] newData = {'4'};
CountDownLatch latch = new CountDownLatch(1);
// Use listener only for watches
client
.getCuratorListenable()
.addListener(
(CuratorFramework c, CuratorEvent event) -> {
switch (event.getType()) {
case WATCHED:
WatchedEvent we = event.getWatchedEvent();
System.out.println("watched event: " + we);
if (we.getType() == Watcher.Event.EventType.NodeDataChanged
&& we.getPath().equals(path)) {
// 4. watch triggered
System.out.printf("got the event for the triggered watch\n");
byte[] actualData = c.getData().forPath(path);
assertThat(actualData).isEqualTo(newData);
}
latch.countDown();
break; }});// 1. create
client.create().withMode(CreateMode.PERSISTENT).forPath(path, data);
// 2. getData and register a watch
byte[] actualData = client.getData().watched().forPath(path);
assertThat(actualData).isEqualTo(data);
// 3. setData
client.setData().forPath(path, newData);
latch.await();
// 5. delete
client.delete().forPath(path);
}
@Test
public void testCallbackAndWatch(a) throws Exception {
String path = "/four";
byte[] data = {'4'};
byte[] newData = {'5'};
CountDownLatch latch = new CountDownLatch(2);
// Use listener for both callbacks and watches
client
.getCuratorListenable()
.addListener(
(CuratorFramework c, CuratorEvent event) -> {
switch (event.getType()) {
case CREATE:
// 2. callback for create
System.out.printf("znode '%s' created\n", event.getPath());
// 3. getData and register a watch
assertThat(client.getData().watched().forPath(path)).isEqualTo(data);
// 4. setData
client.setData().forPath(path, newData);
latch.countDown();
break;
case WATCHED:
WatchedEvent we = event.getWatchedEvent();
System.out.println("watched event: " + we);
if (we.getType() == Watcher.Event.EventType.NodeDataChanged
&& we.getPath().equals(path)) {
// 5. watch triggered
System.out.printf("got the event for the triggered watch\n");
assertThat(c.getData().forPath(path)).isEqualTo(newData);
}
latch.countDown();
break; }});// 1. create
client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath(path, data);
latch.await();
// 6. deleteclient.delete().forPath(path); }}Copy the code
A test case for distributed locks
package com.zew.learn;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.junit.Test;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class CuratorLockTests {
private final String connectString = "10.0.4.162:2181";
private final String lockPath = "/locks";
private static final int QTY = 5;
private static final int REPETITIONS = QTY * 10;
@Test
public void lockTest(a) {
// Create a QTY thread pool to simulate a concurrent terminal scenario
ExecutorService service = Executors.newFixedThreadPool(QTY);
final FakeLimitedResource resource = new FakeLimitedResource();
try {
for (int i = 0; i < QTY; ++i) {
final int index = i;
Callable<Void> task = () -> {
// Create a client for a curator
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, new ExponentialBackoffRetry(1000.3));
try {
client.start();
ExampleClientThatLocks example = new ExampleClientThatLocks(client, lockPath, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
example.doWork(10, TimeUnit.SECONDS); }}catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
// log or do something
} finally {
CloseableUtils.closeQuietly(client);
}
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
assertTrue(true);
}
static class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
// Simulate finite resource usage, with a random time interval
public void use(a) throws InterruptedException {
if(! inUse.compareAndSet(false.true)) {
fail("Only one client can use the lock, so this means zK is letting multiple clients get the lock.");
throw new IllegalStateException("Only one client can use this lock");
}
try {
Thread.sleep((long) (3 * Math.random()));
} finally {
inUse.set(false); }}}/** * The class responsible for locking */
static class ExampleClientThatLocks {
private final InterProcessMutex lock;
private final FakeLimitedResource resource;
private final String clientName;
public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
// Create a lock, although the client is different, but lockPath can be used to lock the effect
lock = new InterProcessMutex(client, lockPath);
}
/** * The core code of the actual lock is the following sentence *@paramTime Indicates the duration *@paramUnit Time unit *@throwsException Throws an Exception */
public void doWork(long time, TimeUnit unit) throws Exception {
// Try to acquire the lock, throw an exception if it has not been acquired for longer
if(! lock.acquire(time, unit)) {throw new IllegalStateException(clientName + "Unable to obtain lock");
}
try {
System.out.println(clientName + "Got the lock.");
resource.use();
} finally {
System.out.println(clientName + "Release lock");
lock.release(); // always release the lock in a finally block}}}}Copy the code
Service discovery test case
package org.yao;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.junit.Test;
import static com.google.common.truth.Truth.assertThat;
public class ServiceDiscoveryTests {
private String connectString = "localhost:2181";
/** Shows the basic usage for curator-x-discovery. */
@Test
public void testBasics(a) throws Exception {
CuratorFramework client = null;
ServiceDiscovery<String> discovery = null;
ServiceProvider<String> provider = null;
String serviceName = "test";
String basePath = "/services";
try {
client = CuratorFrameworkFactory.newClient(connectString, new RetryOneTime(1));
client.start();
ServiceInstance<String> instance1 =
ServiceInstance.<String>builder().payload("plant").name(serviceName).port(10064).build();
ServiceInstance<String> instance2 =
ServiceInstance.<String>builder().payload("animal").name(serviceName).port(10065).build();
System.out.printf("instance1 id: %s\n", instance1.getId());
System.out.printf("instance2 id: %s\n", instance2.getId());
discovery =
ServiceDiscoveryBuilder.builder(String.class)
.basePath(basePath)
.client(client)
.thisInstance(instance1)
.build();
discovery.start();
discovery.registerService(instance2);
provider = discovery.serviceProviderBuilder().serviceName(serviceName).build();
provider.start();
assertThat(provider.getInstance().getId()).isNotEmpty();
assertThat(provider.getAllInstances()).containsExactly(instance1, instance2);
client.delete().deletingChildrenIfNeeded().forPath(basePath);
} finally{ CloseableUtils.closeQuietly(provider); CloseableUtils.closeQuietly(discovery); CloseableUtils.closeQuietly(client); }}}Copy the code
The resources
- ZooKeeper combat and source code analysis
- curator.apache.org/
- Github.com/apache/cura…