Author: link

What is a ZooKeeper

Zookeeper is a distributed, open source coordination service for distributed applications. It proposes a set of simple primitives on which distributed applications can build higher-level distributed services for synchronization, configuration management, grouping, naming, and so on. Designed to be easy to program, Zookeeper uses a data model similar to a file system’s directory tree structure and runs in Java with Java and C bindings.

Coordination services in distributed systems are notoriously difficult to implement correctly, especially prone to errors such as race conditions and deadlocks. The motivation behind Zookeeper is to make it easier for distributed applications to implement coordinated services from scratch.

The data model

Zookeeper maintains a hierarchical data structure that is very similar to a standard file system, as shown below:

The Zookeeper data structure has the following characteristics:

  1. Each subdirectory entry, such as NameService, is called a Znode, which is uniquely identified by its path, such as Server1, which is identified as /NameService/Server1
  2. Znodes can have child directories and each Znode can store data. Note that EPHEMERAL directory nodes cannot have child directories
  3. Znodes have versions. Each ZNode can store multiple versions of data. That is, one access path can store multiple copies of data
  4. A ZNode can be a temporary node. Once the client that created the ZNode loses contact with the server, the ZNode will be automatically deleted. The Communication between the Zookeeper client and the server adopts the long connection mode, and each client and server keep the connection through heartbeat, and the connection state is called session. If the ZNode is temporary, the session is invalidated and the ZNode is deleted
  5. Znode directory names can be automatically numbered. If App1 already exists, it will be automatically named App2
  6. Znode can be monitored, including the change of data stored in the directory node and the change of sub-node directory. Once the change is made, the monitoring client can be notified. This is the core feature of Zookeeper, and many functions of Zookeeper are realized based on this feature.

    Simple API

    One of Zookeeper’s design goals is to provide a simple programming interface. Thus, it only provides the following operations:

  • Create: Creates a node at a specific address in the tree
  • Delete: deletes a node
  • Exists: checks whether a node exists on an address
  • Get Data: reads data from a node
  • Set data: Writes data to a node
  • Get children: Retrieves the list of child nodes
  • Sync: Waits for data transmission to complete

Who’s in?

For example, The backstage of Mi Chat uses ZooKeeper as a unified collaboration system of distributed services. ZooKeeper is also widely used and modified by Alibaba developers, who have made TaoKeeper open source to suit their business needs. ZooKeeper is also used in Apache HBase, Apache Kafka, and Facebook Message.

Application scenarios

  • Scenarios with small data volumes but high data reliability requirements, such as managing collaborative data for distributed applications.

    What not to do

  • ZooKeeper is not suitable for mass storage because it is mainly used to manage critical data for distributed application collaboration. Different applications have different requirements for large amounts of data, such as consistency and persistence, so it is best practice to separate application data from collaboration data when designing applications, and there are many options for large amounts of data, such as databases or distributed file systems.
  • Do not let the ZooKeeper Server manage the application cache. Instead, delegate this task to the ZooKeeper client, as this can complicate ZooKeeper’s design. For example, allowing ZooKeeper to manage cache invalidation can result in ZooKeeper being stuck waiting for a client to confirm a cache invalidation request, because the corresponding cache data needs to be confirmed before any write operation can be performed.

The Node.js application communicates with the ZooKeeper Server

So how to implement node.js application as a part of the whole heterogeneous distributed system, which needs to operate ZNode on ZooKeeper Server as a client? In fact, as a client, you only need to operate zNode as a file, and you can monitor the changes of ZNode, which is very convenient. This article only describes how to implement the ZooKeeper client role using Node.js.

node-zookeeper

Node-zookeeper is a Node.js client implementation of ZooKeeper. This module is implemented based on C API provided by ZooKeeper native.

Download NPM Install ZooKeeper

chestnuts

var ZooKeeper = require ("zookeeper"); Var zk = new ZooKeeper({connect: "localhost:8888" // zk server address and monitoring port number,timeout: 200000 // in milliseconds,debug_level: ZooKeeper.ZOO_LOG_LEVEL_WARN ,host_order_deterministic: false }); zk.connect(function (err) { if(err) throw err; console.log ("zk session established, id=%s", zk.client_id); zk.a_create ("/node.js1", "some value", ZooKeeper.ZOO_SEQUENCE | ZooKeeper.ZOO_EPHEMERAL, function (rc, error, path) { if (rc ! = 0) { console.log ("zk node create result: %d, error: '%s', path=%s", rc, error, path); } else { console.log ("created zk node %s", path); process.nextTick(function () { zk.close (); }); }}); });Copy the code

Among them:

  • Connect: contains the host name and port of the ZooKeeper server.
  • Timeout: indicates the maximum time that ZooKeeper waits for the client to communicate, in milliseconds. After that, the session is declared dead. ZooKeeper sessions generally have a timeout period of 5-10 seconds.
  • Debug_level: Sets the output level of logs. There are four levels:ZOO_LOG_LEVEL_ERROR.ZOO_LOG_LEVEL_WARN.ZOO_LOG_LEVEL_INFO.ZOO_LOG_LEVEL_DEBUG
  • Host_order_deterministic: After initializing the ZK client instance, check whether the zK client instance is connected to the hosts in the ZooKeeper Server cluster in the specified sequence until the connection succeeds or the session is disconnected.

Common API:

  • Connect () : connects to the ZooKeeper Server
  • A_create (path, data, flags, path_cb): Creates a zNode and assigns a value to determine the node type of the znode (permanent, temporary, permanently ordered, temporarily ordered).
  • A_get (path, watch, data_cb): path: the zonde node path where we want to get data. Watch: indicates whether we want to listen for subsequent data changes on this node. Data_cb (rc,error, stat, data): rc: return code. 0 indicates success. Error: indicates an error message. Stat: metadata information of znode. Data: data in a Znode.
  • A_set (path, data, version, stat_cb): ZooKeeper does not allow partial writing or reading of ZNode data. When setting or reading data of a Znode, the zNode content may be replaced or read entirely. Path: We want to set the zonde node path for the data. Data: The data we want to set. A ZNode can contain any data stored as a byte array. The format of byte arrays is specific to each application implementation. ZooKeeper does not directly provide parsing support. Users can use serialization protocols such as Protobuf, Thrift, Avro, or MessagePack to handle data formats stored in ZNode. Generally, utF-8 encoded strings are sufficient. Version: Indicates the version of zNode, extracted from stat. Data_cb (rc, error, stat): Sets the callback of data.
  • Close (): closes the client connection
  • A_exists (path, watch, stat_cb): Determines whether a ZNode exists
  • adelete( path, version, voidCb) : Delete zNode, end with”“In order not to reserve words.”delete“Conflict…

Implement ORM CURD on the specified ZNode data

'use strict'

const ZooKeeper = require('zookeeper');
const logger = require('../logger/index.js'); // 打日志的工具
const Promise = require('bluebird');
const _ = require('lodash');
let node_env = process.env.NODE_ENV ? process.env.NODE_ENV: 'development';
let connect = node_env === 'development' ? 'zktest.imweb.com:8888' : 'zk.imweb.oa.com:8888';
let timeout = 200000; // 单位毫秒
let path = node_env === 'development' ? '/zk_test/blackList' : '/zk/blackList';
let debug_level = ZooKeeper.ZOO_LOG_LEVEL_WARN;
let host_order_deterministic = false;
let defaultInitOpt = {
    connect,
    timeout,
    debug_level,
    host_order_deterministic
};

class ZK {
    constructor(opt) {
        this.opt = opt;
        this._initZook();
    }

    _initZook() {
        this.zookeeper = new ZooKeeper(this.opt.initOpt || defaultInitOpt);
    }

    /**
     * [get zookeeper blackList]
     * @return {[type]}            [description]
     */
    get() {
        return new Promise((resolve, reject) => {
            let self = this;
            self.zookeeper.connect(function(error) {
                if (error) {
                    reject(error);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(err);
                    } else {
                        logger.info('get zk node: ' + data)
                        resolve(data);                        
                    }
                    process.nextTick(() => {self.zookeeper.close();});
                })
            });
        });
    }

    /**
     * [set zookeeper black_list]
     * @param {object}   opt: 
     * {
     *     380533076: {
     *         "anchor_uin": 380533076,
     *         "expired_time": 1462876279
     *     },
     *     380533077: {
     *         "anchor_uin": 380533077,
     *         "expired_time": 1462876279
     *     },
     * }
     */
    set(opt) {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('get zk node %s', data);
                        console.log('stat: ', stat);
                        console.log('data: ', typeof data);
                        try {
                            zkData = JSON.parse(data);
                        } catch (e) {
                            reject(e);
                            return;
                        }

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        _.extend(zkData.data, opt);
                        let currVersion = stat.version;
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node set result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('set zk node succ!');
                                resolve(stat);

                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        });
    }

    /**
     * [delete zookeeper znode]
     * @param  {array}   keys     [要删除的黑名单的QQ号]
     * @return {[type]}            [description]
     */
    delete(keys) {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('get zk node %s', data);
                        console.log('stat: ', stat);
                        console.log('data: ', typeof data);
                        try {
                            zkData = JSON.parse(data);
                        } catch (e) {
                            reject(e);
                            return;
                        }

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        for (let key of keys) {
                            delete zkData.data[key];
                        }

                        let currVersion = stat.version; // 只对这个znode被读取时的这个ersion,否则会抛错。
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node set result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('set zk node succ!');
                                resolve(stat);
                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        })

    }

    /**
     * [add description]
     * @param {[type]}   opt      [description]
     */
    add(opt) {
        // zookeeper只能以覆盖的方式set
        return this.set(opt);
    }

    clear() {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('stat: ', stat);

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        zkData.data = '';
                        let currVersion = stat.version;
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node clear result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('clear zk node succ!');
                                resolve(stat);
                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        });
    }
}

module.exports = ZK;
Copy the code

Ivweb. IO /topic/579db…