Zookeeper + Kafka for cluster building

Environment requirements Pre-install

  • Install the Java development environment JDK1.8 in Centos
  • Install the Supervisor on Cenos

zookeeper cluster

  1. Download and install ZK
    • Download Zookeeper
    • Install the Zookeeper
# 0. Set cluster hosts for subsequent configurationVim /etc/hosts 172.1.1.1 Data_Center_ZK_1 172.1.1.2 Data_Center_ZK_2 172.1.1.3 Data_Center_ZK_3# 1. unpack and cd to the rootThe tar XZF zookeeper - 3.4.10. Tar. Gz &&cdZookeeper - 3.4.10# 2. Configure standalone ZK for reference only
# cp conf/zoo_sample.cfg conf/zoo.cfg
# vim conf/zoo.cfg
# tickTime=2000
# initLimit=10
# syncLimit=5
# dataDir=/opt/data/zookeeper
# clientPort=2181
# maxClientCnxns=60
# autopurge.snapRetainCount=3
# autopurge.purgeInterval=24
# 2. Configure cluster ZK
Note that the server IDS of zK clusters cannot be the same
vim /opt/data/zookeeper/myid # specify the ID of each ZK server, e.g. 1, 2, 3
cp conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg 
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/opt/data/zookeeper
    clientPort=2181
    maxClientCnxns=60
    autopurge.purgeInterval=24
    server.1=Data_Center_ZK_1:2888:3888
    server.2=Data_Center_ZK_2:2888:3888
    server.3=Data_Center_ZK_3:2888:3888
    
# 3. Configure the Java heap size (2G/4G)
# Note that zK should avoid using swap as much as possible, performance will degrade significantly
# here, the initial ZK JVM size is 512MB, with a maximum of 2G, with a total memory of 4G
vim conf/java.env
    export JVMFLAGS="-Xmx2048m -Xms512m"
    
# 4. Start the service
bin/zkServer.sh start
# bin/zkServer.sh stop
bin/zkServer.sh status

The above steps need to be configured separately on the three servers
# 5. Client connection tests cluster availability
bin/zkCli.sh -server Data_Center_ZK_1:2181
    help
    ls /
    create /test "hello world!"
    get /test
bin/zkCli.sh -server Data_Center_ZK_3:2181    
    help
    ls /
    get /test
Copy the code
  1. The resources
    • Zookeeper Admin

kafka cluster

  1. Download and install Kafka
    • Download kafka_2.11
    • Install the Kafka
# 0. Set cluster hosts for subsequent configurationVim /etc/hosts 172.1.1.1 Data_Center_Kafka_1 172.1.1.2 Data_Center_Kafka_2 172.1.1.3 Data_Center_Kafka_3# 1. unpackThe tar XZF kafka_2. 11-1.0.0. TGZcdKafka_2. 11-1.0.0# 2. Cluster configuration
# Kafka uses ZooKeeper. Ensure that the ZooKeeper link has successfully started the service
vim config/server.properties
    # The id of The broker. 3 servers are configured with different broker ids
    broker.id=1
    # Zookeeper connection string
    zookeeper.connect=Data_Center_ZK_1:2181,Juliye_Data_Center_ZK_2:2181,Juliye_Data_Center_ZK_3:2181
    Configure socket server
    advertised.host.name=Data_Center_Kafka_1
    advertised.port=9092

# 3. Start the service
bin/kafka-server-start.sh 
# bin/kafka-server-stop.sh

The above steps need to be configured separately on the three servers
# 4. Client connection test
# View all topics
bin/kafka-topics.sh --list --zookeeper Data_Center_ZK_1:2181
# to create topic
bin/kafka-topics.sh --create --zookeeper Data_Center_ZK_1:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
# Run the producer and then type a few messages into the console to send to the server.
bin/kafka-console-producer.sh --broker-list Data_Center_Kafka_1:9092 --topic my-replicated-topic
# Start a consumer
bin/kafka-console-consumer.sh --bootstrap-server Data_Center_Kafka_2:9092 --topic my-replicated-topic --from-beginning
# check the status of multiple replica topics
bin/kafka-topics.sh --describe --zookeeper Juliye_Data_Center_ZK_1:2181 --topic  my-replicated-topic
Output the cluster status
# Topic: my-replicated- Topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 0,2,1

# 5. Check kafka status in zK/ opt/tools/zookeeper - 3.4.10 / bin/zkCli. Sh - server Data_Center_ZK_1:2181help
    ls /
    ls /brokers
    ls /consumers
    ls /config

Copy the code
  1. The resources
    • kafka quickstart
    • Kafka series of storage structures in ZooKeeper

Cluster high Availability

  1. Monitor application services using Supervisor
    • Supervisor Installation Reference
    • Configure the supervisor
    # Add Configs: Add Zookeeper and Kafka deamon 
    vim /etc/supervisord.conf
    
        [program:zookeeper]
        ;command= / opt/tools/zookeeper - 3.4.10 / bin/zkServer. Sh startcommand= / opt/tools/zookeeper - 3.4.10 / bin/zkServer. Sh start - foreground [program: kafka];command= / opt/tools/kafka_2. 11-1.0.0 / bin/kafka - server - start. Shcommand= / opt/tools/kafka_2. 11-1.0.0 / bin/kafka - server - start. Sh/opt/tools/kafka_2. 11-1.0.0 / config/server properties# start
    supervisord -c /etc/supervisord.conf
    # check status
    supervisorctl status all
    Copy the code
  2. Use the Supervisor of Systemd to set up the startup

Zookeeper + Storm for cluster building

Environment requirements Pre-install

  • Install the Java development environment JDK1.8 in Centos
  • Install the Supervisor on Cenos

For details about how to set up a ZooKeeper cluster, see Part 1

storm cluster

  1. Strom Installation precautions
    • Storm uses Zookeeper for coordinating the cluster.
    • Single node Zookeeper clusters should be sufficient for most cases
    • It’s critical that you run Zookeeper under supervision
    • It’s critical that you set up a cron to compact Zookeeper’s data and transaction logs.
  2. Install dependencies on Nimbus and worker machines
    • Python 2.7 python --version
    • Install the Java development environment JDK1.8 in Centos
    • Install the Supervisor on Cenos
  3. Install the configuration
    • Storm 1.2.1 download address
    • See Setting Up a Development Environment

# 0. Set cluster hosts for subsequent configurationVim /etc/hosts 172.1.1.1 Data_Center_Storm_1 172.1.1.2 Data_Center_Storm_2 172.1.1.3 Data_Center_Storm_3 mkdir -p /opt/data/storm# 1. Download and extract a Storm release to Nimbus and worker machinesTar XZF storm-1.2.1.tar.gz -c /etc/tools/cdThe/etc/tools/storm - 1.2.1# 2. Fill in mandatory configurations into storm.yaml
vim conf/storm.yaml
    storm.local.dir: "/opt/data/storm"
    storm.zookeeper.servers:
        - "Data_Center_ZK_1"
        - "Data_Center_ZK_2"
        - "Data_Center_ZK_3 nimbus.seeds : ["Data_Center_Storm_1"] drpc.servers: - "Data_Center_Storm_1"# -"Data_Center_Storm_2"# -"Data_Center_Storm_3"Drpc. port: 3772 # default configuration # 3. Launch daemons under supervision using"storm# Enable Nimbus, Supervisor, UI nohup Storm Nimbus & Nohup Storm Supervisor on storm-1 Nohup Storm UI & Nohup storm DRPC & # Start supervisor on Storm-2 and Storm-3 http://Data_Center_Storm_1:8080 # to check storm cluster statusCopy the code

Cluster high Availability

  1. Monitor application services using Supervisor
    • Supervisor Installation Reference
    • Configure the supervisor
    # Add Configs: Storm-Supervisor | Storm-UI | Storm-Nimbus
    # Note that UI and Nimbus are only set up on node 1vim /etc/supervisord.conf [program:storm_nimbus] ; nohup storm nimbus &command= / opt/tools/apache - storm - 1.2.1 / bin/storm nimbus [program: storm_supervisor]; nohup storm supervisor &command= / opt/tools/apache - storm - 1.2.1 / bin/storm the supervisor/program: storm_ui; nohup storm ui &command= / opt/tools/apache - storm - 1.2.1 / bin/storm/program: storm_drpc UI; nohup storm drpc &command= / opt/tools/apache - storm - 1.2.1 / bin/storm DRPC# start
    supervisord -c /etc/supervisord.conf
    # check status
    supervisorctl status all
    Copy the code
  2. Use the Supervisor of Systemd to set up the startup

Cluster test ZooKeeper + Kafka + Storm

Configure the client development environment

  • Install JDK1.8
  • Install Git
  • Install Maven
  • Install the Storm
    • Download the latest source code (example)
    • Download the latest release (client configuration bin environment)
# 0. Unpack the release stormTar XZF software/apache-storm-1.2.1.tar.gz -c tools/# 1. Configure the environment
vim /etc/profile.d/global_ops_cmd.sh
    export JAVA_HOME="/ usr/Java/jdk1.8.0 _161"
    export MVN_HOME="/ opt/tools/apache maven -- 3.5.2"
    export STORM_HOME="/ opt/tools/apache - storm - 1.2.1." "

    export PATH="$PATH:$STORM_HOME/bin:$MVN_HOME/bin"
. /etc/profile.d/global_ops_cmd.sh

# 2. Configure the remote cluster information and specify nimbus server nodes
# Config cluster information, 
# The local Storm configs are the ones in ~/.storm/storm.yaml merged in with the configs in defaults.yaml
vim ~/.storm/storm.yaml
    nimbus.seeds: ["Data_Center_Storm_1"]

# 3. Check the cluster topology status
storm list
If not configured locally, you can pass it as a command line argument
# storm list -c nimbus.host=Data_Center_Storm_1

# 4. Other common commands on Storm client
storm kill topology-name [-w wait-time-secs]
storm activate topology-name
storm deactivate topology-name
storm jar topology-jar-path class ...

Git Clone repo
The code in the # release version
cd /opt/apps
git clone git://github.com/apache/storm.git 

Copy the code
  • Refs
    • Setting-up-development-environment
    • Creating-a-new-Storm-project
    • Tutorial
    • Storm command line client

Run storm-starter examples

# Root dir
cd /opt/apps/storm/

# 1. Switch to the version of code you need to avoid problems with different version instances
# Since the storm cluster version here is 1.2.1, we switch to the corresponding version code hereGit tag Git Checkout tags/v1.2.1cd /opt/apps/storm/examples/storm-starter

mvn clean package

# Run the WordCountTopology in remote/cluster mode,
storm jar target/storm-starter-*.jar org.apache.storm.starter.WordCountTopology WordCountProduction remote

# Run the RollingTopWords in remote/cluster mode,
# under the name "production-topw-1"
storm jar target/storm-starter-*.jar org.apache.storm.starter.RollingTopWords production-topw-1 remote

Copy the code

Run storm-kafka-client examples

# 1. Switch to the version of code you need to avoid problems with different version instances
# Since the storm cluster version here is 1.2.1, we switch to the corresponding version code hereGit tag Git Checkout tags/v1.2.1cd examples/storm-kafka-client-examples/

# 2. Modify project dependencies
We need to specify scope as compile explicitly, otherwise NoClassDefFoundError may occur
vim ./pom.xml
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>${project.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>${storm.kafka.artifact.id}</artifactId>
            <version>${storm.kafka.client.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${storm.kafka.client.version}</version>
            <scope>compile</scope>
        </dependency>

# 3. Modify the version code
vim src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
        // # update original method newKafkaTridentSpoutOpaque parameter list, as follows:
        private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque(String broker, String topic1, String topic2) { //...
        // # newKafkaSpoutConfig = newKafkaSpoutConfig
        protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig(String broker, String topic1, String topic2) { //...
        
        //The default is local
        //# DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
        //If remote mode is running, replace it with the following:
        Thread.sleep(2000);
        Config drpc = new Config();
        drpc.setDebug(false);
        drpc.put("storm.thrift.transport"."org.apache.storm.security.auth.SimpleTransportPlugin"); //"backtype.storm.security.auth.SimpleTransportPlugin");
        drpc.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
        drpc.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
        drpc.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
        drpc.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576);
        System.out.printf("drpc config: %s \n", drpc);
        try {
            DrpcResultsPrinter client = DrpcResultsPrinter.remoteClient(drpc, "Juliye_Data_Center_Storm_1", 3772);
            System.out.printf("client: %s \n", client);
            client.printResults(60, 1, TimeUnit.SECONDS);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            System.out.printf("finally \n");
        }
            
# 4. Package using Maven
Kafka_artifact_id kafka_broker_version kafka_broker_version kafka_artifact_id kafka_broker_version
# mvn clean package -Dstorm.kafka.artifact.id=<kafka_artifact_id> -Dstorm.kafka.client.version=<kafka_broker_version>
The version installed here is kafka_2.11-1.0.0MVN clean package - Dstorm. Kafka. An artifact. Id = kafka_2. 11 - Dstorm. Kafka. Client. Version = 1.0.0# 5. Upload Storm Topology
# note that the following 4 parameters are:
# specify kafka node; Specify the name of topology 1 (used to produce MSG data); Specify the name of topology 2 (used to produce MSG data); Specify remote execution (non-local mode)Storm jar target/storm - kafka - the client - examples - 1.2.1. Jar org.apache.storm.kafka.trident.TridentKafkaClientWordCountNamedTopics Data_Center_Kafka_2:9092 kafka-prod-1 kafka-prod-2  remote# storm -c nimbus.host=Juliye_Data_Center_Storm_1 jar target/storm-kafka-client-examples-1.2.1.jar org.apache.storm.kafka.trident.TridentKafkaClientWordCountNamedTopics
Copy the code
Exceptions that may be encountered:
#
#
# 1. Dependency problem, partial dependency cannot be found
# Error: A JNI error has occurred, please check your installation and try again
# Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/storm/kafka/...
# Refer to Step 2 above


# 2. Kafka Producer could not write a problem
# org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.	
# org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Change the kafka configuration
vim config/server.properties
    Configure socket server
    advertised.host.name=Data_Center_Kafka_1
    advertised.port=9092


# 3. Kafka consumer cannot connect to DRPC
# java.lang.RuntimeException: 
# No DRPC servers configured for topology at org.apache.storm.drpc.DRPCSpout.open(DRPCSpout.java:149) at org.apache.storm.trident.spout.RichSpoutBatchTriggerer.open(RichSpo	1. Start DRPC server vim /opt/tools/apache-storm-1.2.1/conf/storm.yaml drpc.servers: -"Juliye_Data_Center_Storm_1"
        #- "Juliye_Data_Center_Storm_2"
        #- "Juliye_Data_Center_Storm_3"drpc.port: 3772 (2) the connection configuration code vim SRC/main/Java/org/apache/storm/kafka/trident TridentKafkaClientWordCountNamedTopics. Java Thread.sleep(2000); Config drpc = new Config(); drpc.setDebug(false);
    drpc.put("storm.thrift.transport"."org.apache.storm.security.auth.SimpleTransportPlugin"); //"backtype.storm.security.auth.SimpleTransportPlugin");
    drpc.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
    drpc.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
    drpc.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
    drpc.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576);
    System.out.printf("drpc config: %s \n", drpc);
    try {
        DrpcResultsPrinter client = DrpcResultsPrinter.remoteClient(drpc, "Juliye_Data_Center_Storm_1", 3772);
        System.out.printf("client: %s \n", client);
        client.printResults(60, 1, TimeUnit.SECONDS);
    }catch (Exception e) {
        e.printStackTrace();
    }finally {
        System.out.printf("finally \n");
    }

Copy the code
  • Storm 1.2.1 Kafka <= 0.8.x
  • Storm 1.2.2 kafka >= 0.10
  • Storm DRPC
  • Storm DRPC ERROR
  • Details on Storm configuration items

More reference

  • Zookeeper Maintenance
  • Zookeeper Supervision
  • Zookeeper Monitoring
  • Zookeeper Logging
  • Zookeeper Admin
  • ZooKeeper Getting Started Guide
  • How to set zk java heap
  • Storm Rationale comment summary
  • Storm Video Tutorial – ETE 2012
  • Storm Documentation Index
  • Storm Main Page
  • Storm Tutorial
  • streamparse python
  • Running Apache Storm under Supervision: Supervisord
  • Streamparse io