preface

Before a summary

The previous article supplemented the ZAB protocol and an implementation of distributed queues. ZAB covered a protocol flow and crash recovery when a follower loses contact, as well as how to synchronize data and discard transactions. The structure of the concrete code implementation of the distributed queue, as well as the basic definition of each method in the class has also been mentioned, I believe that you will be able to complete the complement of the code and run successfully. The code logic for the distributed queue is shown below. Note the use of dotted and solid lines and the color of the lines to indicate different behaviors.

Previous link

High concurrency from scratch (1) – basic concept of Zookeeper

High concurrency from zero (2) – Zookeeper implements distributed locking

High concurrency from scratch (3) – Setting up a Zookeeper cluster and electing a leader

High concurrency from scratch (4) – Classic application scenarios of Zookeeper

Application of ZooKeeper in the configuration center

In a distributed environment, we have many services, and the configuration is shared. The configuration center can help us solve the problem of system parameter configuration and dynamic modification of parameters

1. What is the configuration center

The operation and maintenance management personnel submit the configuration modification and push the configuration to the configuration center. Each application instance under our distributed application can obtain the change of the configuration center file by listening to the Watch event, and some properties of the application can be replaced from the memory without restarting the service

2. Implement the Configuration center of ZooKeeper

Zookeeper Data structure of the configuration center

If we have a zookeeper server, we need to create a distributeConfigure root directory in the configuration center. Note that the server in the figure refers to a service in our cluster. Server1-file1.cnf refers to the configuration file1, 2 under this service. 3···, server.port also belongs to one of the configurations, which corresponds to each node under ZooKeeper from this layer. In other words, we treat each configuration, such as the service port server.port, as a ZNode. Then save them to ZooKeeper one by one.

We don’t care if the service is down or what, and we don’t care about the order of the nodes (unless it’s required by the service function). In addition, our configuration usually has the name of the configuration, but the order is not required, so we choose the persistent node to record the configuration.

At this time, the application service should do a very simple thing, is to monitor these nodes, as long as there is a change in the node, the application service will take the data under the node.

Another way to think about the configuration center

As mentioned above, a configuration item corresponds to a Znode. In fact, we can use another idea. For example, if many configuration items are placed in the same file, I can use one file to correspond to a znode, and the contents of the file are placed in the value of znode

3. Zookeeper implements the code implementation of the configuration center

① Role 1: O&M personnel

As we mentioned just now, the operation and maintenance management personnel will submit and push the configuration modification to the configuration center. Now we need to implement an interface needed by the operation and maintenance personnel to read and write the configuration file.

Public interface ConfigureWriter {/** * create a new configuration file * @param fileName fileName * @param items configuration items * @return path of the new file on zk */ String createCnfFile(String fileName, Properties items); /** * Delete a configuration file * @param fileName */ void deleteCnfFile(String fileName); /** * Modify a configuration file * @param fileName * @param items */ void modifyCnfItem(String fileName, Properties items); /** * Load the configuration file * @param fileName * @return */ Properties loadCnfFile(String fileName);Copy the code

}

② The interface used by the application server

/** * ConfigureReader */ public interface ConfigureReader {/** * read the configuration file * @param fileName configuration fileName * @param ChangeHandler Handler whose configuration has changed * @return Returns a Properties object if a file configuration exists, null if not */ Properties loadCnfFile(String fileName); /** * listen for configuration file changes. This operation only needs to be called once. * @param fileName * @param changeHandler */ void watchCnfFile(String fileName, ChangeHandler changeHandler); /** * config file ChangeHandler * ChangeHandler */ interface ChangeHandler {/** * config file change after giving a complete property object * @param newProp */ void itemChange(Properties newProp); }}Copy the code

③ Test code

As mentioned earlier, operation and maintenance personnel need to use ConfigureWriter interface to read and write configuration files. To ensure that this node does not exist on ZooKeeper, firstly run writer.deletecnffile (fileName). A thread was used to read the configuration file

Public class ConfigureTest {public static void main(String[] args) { ConfigureWriter Writer = new ZkConfigureCenter(); String fileName = "trade-application.properties"; writer.deleteCnfFile(fileName); Properties items = new Properties(); items.put("abc.gc.a", "123"); items.put("abc.gc.b", "3456"); // Create a configuration file with the contents of properties items. String znodePath = writer.createCnfFile(fileName, items); System.out.println("new file: "+znodePath); new Thread(()->{ readCnf(); }).start(); try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } items.put("abc.gc.a", "haha");} // Update the file contents after 3 seconds. // Modify items.put("abc.gc.c", "xx"); / / new items. Remove (ABC. Gc. "b"); // Delete writer.modifyCNfitem (fileName, items); try { Thread.currentThread().join(); } catch (InterruptedException e) { e.printStackTrace(); } /** * simulate the application to load the configuration file, */ public static void readCnf() {system.out.println (" read and listen to the configuration file "); ConfigureReader reader = new ZkConfigureCenter(); String fileName = "trade-application.properties"; Properties p = reader.loadCnfFile(fileName); // Read the configuration file system.out.println (p); WatchCnfFile (fileName, reader.watchcnffile) New ChangeHandler() {@override public void itemChange(Properties newProp) {system.out.println (" find data changed: "+ newProp); }}); try { Thread.currentThread().join(); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code

Running effect

It’s all about putting each of our configuration items into ZooKeeper

First of all, we set up such a configuration file/distributeConfigure/cnfFile/trade – application. The properties and the content under the file is

The client also reads information about the configuration file

We then made some changes to the configuration file, which had been clearly written in the process of testing the code, adding, deleting and modifying the three operations once

If the connection is disconnected, an error will be reported

We can open the zkClient command to check the reliability of the program, through the ls/PATH command to check whether these nodes are normal, we can manually modify the node data

Things yet at this time, suppose we one-time allocation of hundreds or even thousands of modified, wouldn’t it be suddenly pops up hundreds of thousands of notice, so we have to consider the request to merge, also, we may also have more than one operations staff, is also likely to several people at the same time to modify the same configuration file, So we can also consider locking the configuration file so that only one person can make changes, but we should be careful because zooKeeper commits write transactions atomically and writes are done sequentially, but we will simulate a situation where only one person is allowed to make changes.

With that in mind, let’s move on to the implementation of the configuration center not mentioned in the code above

4. Add the specific implementation of the configuration center

(1) Define a ZkConfigureCenter that implements read and write interfaces. ConfigureWriter is for administrators and ConfigureReader is for programs

public class ZkConfigureCenter implements ConfigureWriter, ConfigureReader {}
Copy the code

② Definition of member attributes

ConfRootPath is the root directory, confFilePath is the configuration file directory, and fileLockPath is the directory for simulating locks. Since we use one lock for a file, we must use more than one lock. So we create a directory to store these locks. Each time a write operation is initiated, a node is added to the file lock.

private String confRootPath;
private String confFilePath;
private String fileLockPath;
private static final String default_confRootPath = "/distributeConfigure";
private ZkClient client;
Copy the code

(3) The constructor and a simple inspection method

public ZkConfigureCenter() { this(default_confRootPath); } public ZkConfigureCenter(String path) { if(path == null || path.trim().equals("")) { throw new IllegalArgumentException(" Patch cannot be an empty string "); } confRootPath = path; confFilePath = confRootPath+"/cnfFile"; fileLockPath = confRootPath+"/writeLock"; client = new ZkClient("localhost:2181"); client.setZkSerializer(new MyZkSerializer()); if (! this.client.exists(confFilePath)) { try { this.client.createPersistent(confFilePath, true); } catch (ZkNodeExistsException e) {}}} Private void checkElement(String v) {if (v == null) throw new NullPointerException(); If ("".equals(v.rim ())) {throw new IllegalArgumentException(" cannot use Spaces "); } the if (v. tartsWith (" ") | | v.e ndsWith (" ")) {throw new IllegalArgumentException (" cannot contain Spaces before and after "); }}Copy the code

④ Create a configuration file

First of all, the configuration file must have a fileName. Items refers to the properties in the configuration file. See the comments for details. Use inside the high concurrency (2) we are starting from scratch – they are implementing distributed lock ZkDistributeImproveLock. In Java, If you want to run the previous test code, it is recommended to CTRL + C /+ V, remember to delete I do not need to overwrite the method complete

@Override public String createCnfFile(String fileName, Properties items) { checkElement(fileName); // Create a configuration file. Node String cfgNode = confFilePath+"/"+fileName; If (client.exists(cfgNode)) {throw new IllegalArgumentException("["+fileName+"]) The file already exists!" ); Client.createpersistent (cfgNode, true); client.createpersistent (cfgNode, true); If (items == null) {return cfgNode; } // Here we create a distributed lock with the name of the profile, Different file names mean different locks // ZkDistributeImproveLock implementation (see "High concurrency from zero (2) - Zookeeper implements distributed Locks ") Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName); distributeWriteLock.lock(); Try {items.keyset ().iterator(); Set<Map.Entry<Object, Object>> entrySet = items.entrySet(); for (Map.Entry<Object, Object> entry : entrySet) { System.out.println(entry.getKey() + "=" + entry.getValue()); String cfgItemNode = cfgNode +"/"+ entry.getKey().toString(); client.createPersistent(cfgItemNode, entry.getValue()); } } finally { distributeWriteLock.unlock(); } return cfgNode; }Copy the code

⑤ Deletion Method

DeleteRecursive () is a recursive deletion method that blocks if the lock is not acquired. You can also specify a recovery time in a distributed lock implementation where the lock is not acquired. We can just kill the process, or we can try it, and we can’t get it, so somebody’s modifying it, and we can either return a value or we can throw an exception to tell the process that somebody’s modifying it

@Override public void deleteCnfFile(String fileName) { checkElement(fileName); String cfgNode = confFilePath+"/"+fileName; Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName); // Get the lock distributeWriteLock. Lock (); try { client.deleteRecursive(cfgNode); } the finally {/ / releases the lock distributeWriteLock. Unlock (); }}Copy the code

Below is the source code for deleteRecursive

It starts with its own subdirectory, children, and then iterates through the deletion

⑥ Modification Method

In this case, I put the submitted properties file into zNode. The submitted configuration file will be the latest configuration file by default. The main idea is to get the original configuration file first and then compare it with the new configuration file.

Because if we use the delete and add method that we just did in the demo, I might change one configuration out of 100, but I have to redo the whole file, which will cause a lot of listening, so the try block is going to get the original configuration first. Set existentItemSet is basically used to deduplicate, if the Set contains the modified configuration information, then determine whether the data has been changed, if it has been changed, and then take the redundant configuration (redundant refers to the data that has not been compared, Because the modification or constant is already processed by comparison) to delete. If this collection does not contain the configuration items I am writing now, I will add them.

@Override public void modifyCnfItem(String fileName, Properties items) { checkElement(fileName); String cfgNode = confFilePath+"/"+fileName; If (items == null) {throw new NullPointerException(" The configuration item to be modified cannot be null "); } items.keySet().iterator(); Set<Map.Entry<Object, Object>> entrySet = items.entrySet(); Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName); distributeWriteLock.lock(); List<String> itemNodes = client.getChildren(cfgNode); Set<String> existentItemSet = itemNodes.stream().collect(Collectors.toSet()); for (Map.Entry<Object, Object> entry : entrySet) { System.out.println(entry.getKey() + "=" + entry.getValue()); String itemName = entry.getKey().toString(); String itemData = entry.getValue().toString(); String cfgItemNode = cfgNode + "/" + itemName; If (existentitemset.contains (itemName)) {// Existing configuration item in zK String itemNodeData = Client.readData (cfgItemNode); if(! Eql (itemNodeData, itemData) {client.writeData(cfgItemNode, itemData); } existentItemSet.remove(itemName); } else {client.createPersistent(cfgItemNode, itemData);} else {client.createPersistent(cfgItemNode, itemData);} else {client.createPersistent(cfgItemNode, itemData); }} // What's left in existentItemSet is if(! existentItemSet.isEmpty()) { for(String itemName : existentItemSet) { String cfgItemNode = cfgNode + "/" + itemName; client.delete(cfgItemNode); } } } finally { distributeWriteLock.unlock(); }}Copy the code

⑦ Reading method

It’s simple. There’s nothing to say

@Override public Properties loadCnfFile(String fileName) { if(! fileName.startsWith("/")) { fileName = confFilePath+"/"+fileName; } return loadNodeCnfFile(fileName); } private Properties loadNodeCnfFile(String cfgNode) { checkElement(cfgNode); if(! client.exists(cfgNode)) { throw new ZkNoNodeException(cfgNode); List<String> itemNodes = client.getChildren(cfgNode); / / reading configuration information, and loaded into the Properties of the if (itemNodes = = null | | itemNodes. IsEmpty () {return new Properties (); } Properties file = new Properties(); itemNodes.stream().forEach((e)->{ String itemNameNode = cfgNode + "/" + e; String data = client.readData(itemNameNode, true); file.put(e, data); }); return file; }Copy the code

⑧ Set the corresponding listening event

There is a problem with reading data from the child node (I just opened zooKeeper’s zkClient and then used the command to add a new node). When we need to add a new node, we don’t trigger our DataChange listener event because the node was not there when I added it. Without this node, it is impossible to listen for content changes. So we still need through the child nodes handleChildChange () listen to remedy this, finally need to use the client. This is why code subscribeChildChanges (…), to monitor the child nodes of the parent node change at this time, if the child node has changed, FileNodePath is the path of the parent node

See ⑨ for triggerHandler

@Override public void watchCnfFile(String fileName, ChangeHandler changeHandler) { if(! fileName.startsWith("/")) { fileName = confFilePath+"/"+fileName; } final String fileNodePath = fileName; P = loadNodeCnfFile(fileNodePath); if(p ! = null) {int waitTime = 5; int waitTime = 5; final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1); scheduled.setRemoveOnCancelPolicy(true); final List<ScheduledFuture<? >> futureList = new ArrayList<ScheduledFuture<? > > (); Set<Map.Entry<Object, Object>> entrySet = p.entrySet(); for (Map.Entry<Object, Object> entry : EntrySet) {system.out.println (" monitor: "+fileNodePath+"/"+entry.getKey().toString()); client.subscribeDataChanges(fileNodePath+"/"+entry.getKey().toString(), new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println(" trigger delete: "+dataPath); triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler); } @override public void handleDataChange(String dataPath, Object data) throws Exception {system.out.println (" "+dataPath); triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler); }}); } client.subscribeChildChanges(fileNodePath, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {system.out.println (" trigger child: "+parentPath); triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler); }}); }}Copy the code

Pet-name ruby merger

We’ve already seen that a change to a configuration item can trigger so many listener events, which is not desirable. Going back to our ConfigureReader, we’ve defined an interface that combines all changes made in a 5 second waitTime for a configuration item that has changed into one event

/** * config file ChangeHandler * ChangeHandler */ interface ChangeHandler {/** * config file change after giving a complete property object * @param newProp */ void itemChange(Properties newProp); }Copy the code

In ZkConfigureCenter. Java, it is easy to understand that we divide our submitted changes into 5 seconds according to the time. At this time, if the future before the last modification task is not executed successfully within 5 seconds, cancel() will be performed to cancel. Then remove, we only take the last event within 5 seconds as the condition that we listen for event trigger, so instead of merging events, we simply think that we took the last future within 5 seconds of futureList.

/** * Merge modify change events, changes occurring within 5 seconds are merged into one event * @param futureList has a list of scheduled triggered tasks * @param Scheduled task executable * @param waitTime Private void triggerHandler(List<ScheduledFuture<? >> futureList, ScheduledThreadPoolExecutor scheduled, int waitTime, String fileName, ChangeHandler changeHandler) { if(futureList ! = null && ! futureList.isEmpty()) { for(int i = 0 ; i < futureList.size(); i++) { ScheduledFuture<? > future = futureList.get(i); if(future ! = null && ! future.isCancelled() && ! future.isDone()) { future.cancel(true); futureList.remove(future); i--; } } } ScheduledFuture<? > future = scheduled.schedule(()->{ Properties p = loadCnfFile(fileName); changeHandler.itemChange(p); }, waitTime, TimeUnit.SECONDS); futureList.add(future); }Copy the code

This concludes the simulation implementation of the configuration center. The entire class code is in ① ~ ⑨ and can be used directly with CTRL +c/+v

finally

The knowledge summary of configuration center is actually the following 4 knowledge points

Persistent node + Watch mechanism + distributed lock + event mergeCopy the code

Master and some introduction to Treasure on some of zooKeeper’s official websites will be shelved for the next post

Next: High concurrency from scratch (6) – The classic application scenario of Zookeeper 3