• Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

Kafak installation and startup

  • Run the following command to install the MAC OS

brew install kafka

After installation, the application directory:

/ usr/local/Cellar/kafka / 2.0.0

Configuration file Directory

/usr/local/etc/kafka/

Zookeeper configuration file In this directory, you can see that the default zooKeeper port is 2181

Kafka launches and publishes the create theme

  1. Enter the main directory of the directory program, start ZooKeeper first, and then kafka
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
kafka-server-start /usr/local/etc/kafka/server.properties &
Copy the code
  1. Command to stop running
bin/kafka-server-stop
bin/zookeeper-server-stop
Copy the code
  1. Go to the bin directory, create topic, use the default ZooKeeper and default port, set the partition to 1, backup factor to 1, and topic name topic-test-one
./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-test-one
Copy the code

You can run the following command to view all topics and their properties

Show the topic

./kafka-topics –list –zookeeper localhost:2181

Describe the topic

.kafka-topics –describe –zookeeper localhost:2181 –topic topic-test-one

The node access kafka

Module: kafka – node

One partition mode by default

Create a producer first:

var kafka = require('kafka-node'), Producer = kafka.Producer, The client = newkafka. KafkaClient ({kafkaHost: '192.168.20.59:9092}); /** * Define the production class * partitionerType definition * 0: the default mode only generates data in the first partition * 1: randomly allocates, within the number of partitions, randomly generates messages to each partition * 2: circulates, within the number of partitions, / var producerOption = {requireAcks:1, ackTimeoutMs:100, partitionerType:0// default is the first partition}; var producer = newProducer(client,producerOption); Console. log(producer) /** * TOPIC is created on the command line to specify the number of partitions and backups. Create a topic-test-one (kafka-node) that has only one partition to send data to. Create a topic-test-one that has only one partition to send data to. */ functiongetPayloads(){return [{topic:"topic-test-one",messages: [JSON.stringify({"name":"jack","age":"120"})],partition:0} ]; } producer.on("ready",function(){ console.log(getPayloads()) setInterval(function(){ producer.send(getPayloads(),function(err,data){ console.log(data) if(! err){ console.log("send message complete! data:"+JSON.stringify(data),newDate()); } else { console.log(err) } }); }, 1000); }); producer.on('error', function (err) {console.log("send message error! \r\n"+err); }) constkafka = require('kafka-node'); Constclient = newkafka.KafkaClient({kafkaHost:'192.168.20.59:9092'}); Var consumerOption = {groupId:"topic-test-one", autoCommit:true}; /** * Define consumer, */ constConsumer = newkafka.Consumer(client, [{topic:' topi-two ',partition:0}],consumerOption); /** * consumer. On ('message', function (message) {varinfo = message.value; console.log("receive info from kafka:"+info,newDate()); }); Consumer.on ('error', function (message) {console.log('kafka connection error,message:'+message); });Copy the code

Run to see the effect:

Random partition mode

Create topic-test-two again with partition 2

/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic 
topic-test-two
Copy the code

Set the partitionerType of producer producerOption to 1 and remove the partition in getPayloads()

Var producerOption = {requireAcks:1, ackTimeoutMs:100, partitionerType:1}; var producer = newProducer(client,producerOption); Console. log(producer) /** * TOPIC is created on the command line to specify the number of partitions and backups. Create a topic-test-one (kafka-node) that has only one partition to send data to. Create a topic-test-one that has only one partition to send data to. So only data can be generated to partition 1 (subscript 0), */ function getPayloads(){return [ {topic:"topic-three",messages:[JSON.stringify({"name":"jack","age":"120"})]} ]; }Copy the code

You can see that the producer produces data for partition 0 and partition 1, respectively, at random

Create two consumers at the same time, set values from partition0 and partition1, and observe the effect:

consumer1.js

const consumer = newkafka.Consumer(client, [
  { topic:'topic-test-two',partition:0}
],consumerOption);
consumer2.js
const consumer = newkafka.Consumer(client, [
  { topic:'topic-test-two',partition:1}
],consumerOption);
Copy the code

Consumers get data from partition 0 and partition 1, respectively, and from the time perspective, they consume data from different partitions

Sequential partition mode

This mode is the same as the random partition mode. You only need to set partitionerType to 2.