Preenvironmental conditions

  • docker-compose
  • Nodejs calls kafka using kafka-node

Kafka Image selection:

wurstmeister/kafka

Installation steps

  1. docker pull wurstmeister/kafka
  2. Git clone github.com/wurstmeiste…

Yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml The master said…

  1. Go to the directory under Clone, edit the docker-comemess. yml file, and change the KAFKA_ADVERTISED_HOST_NAME field to the actual IP address
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "9092"Environment: KAFKA_ADVERTISED_HOST_NAME: 10.6.19.121 KAFKA_ZOOKEEPER_CONNECT: Zookeeper :2181 Volumes: - /var/run/docker.sock:/var/run/docker.sockCopy the code
  1. Start the clusterdocker-compose up -dYou can also start multiple Kafka clustersdocker-compose scale kafka=3. The shutdown commanddocker-compose stop
  2. After the cluster is successfully started, thedocker psView the startup status. Note also that the mapped port 32772 is the port address you can actually access, not 9092

test

  • throughdocker psTo obtain the dockerId of kafka service, run the following command to access Kafkadocker exec -it {dockerId} /bin/bash.
  • Enter the directory/ opt/kafka_2. 12-2.2.0 / binThe following scripts should be in the directory
  • Create a topickafka-topics.sh --create --zookeeper {zookeeperName} --replication-factor 1 --partitions 1 --topic {topicName}
  • View the list of topicskafka-topics.sh --list --zookeeper {zookeeperName}ZookeeperName can passdocker psTo obtain
  • newskafka-console-producer.sh --topic=test --broker-list localhost:9092
  • Receives the messagekafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
  • Publishing and receiving messages can passcontrol cTo exit. If the message flow passes, the Kafka deployment is ready to continue.

Nodejs integrates Kafka

They are all producers.

const kafka = require('kafka-node')
const Producer = kafka.Producer
const client = new kafka.KafkaClient({kafkaHost: '10.6.18.78:32776'})
const producer = new Producer(client)
const payloads = [
  { topic: 'test', messages: '232323311', partition: 0}
];
producer.on('ready'.function () {
  producer.send(payloads, function (err, data) {
    if (err) {
      console.error('Send failed', err)
    } else{ console.log(data); }}); }); producer.on('error'.function (err) {
  console.error(err)
})

Copy the code

Consumer

var kafka = require('kafka-node'),
  Consumer = kafka.Consumer,
  client = new kafka.KafkaClient({kafkaHost: '10.6.18.78:32776'}),
  consumer = new Consumer(
    client,
    [
      { topic: 'test'}
    ],
    {
      groupId: 'test'}); consumer.on('message', message => {
  console.log(message)
  consumer.setOffset(message.topic, message.partition, message.offset)
})
consumer.on('error', error => {
  console.error('Consumption error', error)
})

Copy the code

Printed results of consumption information: