Preenvironmental conditions
- docker-compose
- Nodejs calls kafka using kafka-node
Kafka Image selection:
wurstmeister/kafka
Installation steps
- docker pull wurstmeister/kafka
- Git clone github.com/wurstmeiste…
Yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml/docker-composemess.yml The master said…
- Go to the directory under Clone, edit the docker-comemess. yml file, and change the KAFKA_ADVERTISED_HOST_NAME field to the actual IP address
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9092"Environment: KAFKA_ADVERTISED_HOST_NAME: 10.6.19.121 KAFKA_ZOOKEEPER_CONNECT: Zookeeper :2181 Volumes: - /var/run/docker.sock:/var/run/docker.sockCopy the code
- Start the cluster
docker-compose up -d
You can also start multiple Kafka clustersdocker-compose scale kafka=3
. The shutdown commanddocker-compose stop
- After the cluster is successfully started, the
docker ps
View the startup status. Note also that the mapped port 32772 is the port address you can actually access, not 9092
test
- through
docker ps
To obtain the dockerId of kafka service, run the following command to access Kafkadocker exec -it {dockerId} /bin/bash
. - Enter the directory
/ opt/kafka_2. 12-2.2.0 / bin
The following scripts should be in the directory
- Create a topic
kafka-topics.sh --create --zookeeper {zookeeperName} --replication-factor 1 --partitions 1 --topic {topicName}
- View the list of topics
kafka-topics.sh --list --zookeeper {zookeeperName}
ZookeeperName can passdocker ps
To obtain - news
kafka-console-producer.sh --topic=test --broker-list localhost:9092
- Receives the message
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
- Publishing and receiving messages can pass
control c
To exit. If the message flow passes, the Kafka deployment is ready to continue.
Nodejs integrates Kafka
They are all producers.
const kafka = require('kafka-node')
const Producer = kafka.Producer
const client = new kafka.KafkaClient({kafkaHost: '10.6.18.78:32776'})
const producer = new Producer(client)
const payloads = [
{ topic: 'test', messages: '232323311', partition: 0}
];
producer.on('ready'.function () {
producer.send(payloads, function (err, data) {
if (err) {
console.error('Send failed', err)
} else{ console.log(data); }}); }); producer.on('error'.function (err) {
console.error(err)
})
Copy the code
Consumer
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.KafkaClient({kafkaHost: '10.6.18.78:32776'}),
consumer = new Consumer(
client,
[
{ topic: 'test'}
],
{
groupId: 'test'}); consumer.on('message', message => {
console.log(message)
consumer.setOffset(message.topic, message.partition, message.offset)
})
consumer.on('error', error => {
console.error('Consumption error', error)
})
Copy the code
Printed results of consumption information: