1. What is Confluent-kafka?

Confluent Platform is a streaming data Platform, capable of organizing and managing data from different data sources, with a stable and efficient system. The Confluent Platform provides not only the system for data transmission, but also all the tools: tools to connect to data sources, applications, and data reception. Confluent is a Kafka-built software that has an enterprise version for 30 days and an open source versionCopy the code

Here we use the open source version of Confluent-Kafka, which includes the following components:

Complete tool free trial components, please see website: www.confluent.io/download

Component functions is introduced: www.cnblogs.com/dadadecheng…

2. Start building Confluent-Kafka

2.1 Installing the latest confluent-Kafka version

Yum install curl which RPM - y - import https://packages.confluent.io/rpm/5.3/archive.keyCopy the code

Yum configuration source

cat > /etc/yum.repos.d/confluent.repo <<EOF [Confluent.dist] name=Confluent repository (dist) Baseurl = https://packages.confluent.io/rpm/5.3/7 gpgcheck = 1 gpgkey = https://packages.confluent.io/rpm/5.3/archive.key Enabled = 1 [Confluent] name = Confluent repository baseurl = https://packages.confluent.io/rpm/5.3 gpgcheck = 1 EOF gpgkey = https://packages.confluent.io/rpm/5.3/archive.key enabled = 1Copy the code

Confluent Platform using only Confluent Community components:

Yum clean all && yum install confluent -community-2.12-yCopy the code

2.2 installation jdk1.8

Download the JDK from the Oracle official website, upload it to the server, and run the following command to install it:

rpm -ivh jdk-8u231-linux-x64.rpm
Copy the code

Oracle jdk1.8 download page: www.oracle.com/technetwork…

2.3 configuration zookeeper

vim /etc/kafka/zookeeper.properties

The cluster configuration

dataDir=/data/zookeeper/data/
dataLogDir=/data/zookeeper/logs/
maxClientCnxns=500
clientPort=2181
initLimit=5
syncLimit=2
server.1=xxx:2888:3888
server.2=xxx:2888:3888
server.3=xxx:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
Copy the code

Note * Zoo1 zoo2 zoo3 indicates the host name. Hosts have been configured in advance

Echo "1" > / data/zookeeper/data / # myid please perform on the zoo1 machine echo "2" > / data/zookeeper/data / # myid please perform on the zoo2 machine echo "3" > / data/zookeeper/data / # myid please perform on the zoo3 machine revCopy the code

Setting directory Permissions

chmod 777 -R /data/zookeeper/logs/
chmod 777 -R /data/zookeeper/data/
Copy the code

Set the startup and startup of ZooKeeper

systemctl enable confluent-zookeeper && systemctl start confluent-zookeeper
Copy the code

Check the status

systemctl status confluent-zookeeper
Copy the code

See the log

tail -f /var/log/messages
Copy the code

2.3 configuration kafka

Vim/etc/kafka/server properties zoo # modify the following two options

zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181broker.id.generation.enable=true
Copy the code

Set up boot and start Kafka

systemctl enable confluent-kafka && systemctl start confluent-kafka
Copy the code

Check the status

systemctl status confluent-kafka
Copy the code

See the log

tail -f /var/log/messages
Copy the code

3. Test kafka production messages

3.1 create a topic

kafka-topics --create --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --replication-factor 1 --partitions 5 --topic kafkatest
Copy the code

3.2 Production Messages

kafka-console-producer --broker-list zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest
Copy the code

3.3 Consuming messages

kafka-console-consumer --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest --from-beginning
Copy the code

4. Use Golang Demo to test the time consumed by producing messages

package main import ( "fmt" "log" "time" "github.com/Shopify/sarama" ) func main() { var address = []string{"ip1:9092","ip2:9092","ip3:9092"} producer(address) } func producer(address []string) { config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.Timeout = 5 * time.Second p, err := sarama.NewSyncProducer(address, config) if err ! = nil { log.Println(err) } defer p.Close() strKey := "key: " srcValue := "testKafka: test message, index=%d" log.Println("start") for i := 0; i < 10000; i++ { value := fmt.Sprintf(srcValue, i) msg := &sarama.ProducerMessage{ Key: sarama.StringEncoder(strKey), Topic: kafkatest, Value: sarama.ByteEncoder(value), } part, offset, err := p.SendMessage(msg) if err ! = nil { log.Println(err, value, part, offset) } } log.Println("end") }Copy the code

4.1 Time consumed by 10000 and 100000 pieces

Article 10000:

Article 100000:

Still very fast!

5. Write about consumers

package main import ( "fmt" "github.com/Shopify/sarama" "sync" ) func main() { consumer, err := sarama.NewConsumer([]string{"ip:9092"}, nil) if err ! = nil { fmt.Println(err) } fmt.Println("1") partitionList, err := consumer.Partitions("test-golang-kafka") if err ! = nil { fmt.Println(err) } fmt.Println("2") fmt.Println(partitionList) var wg sync.WaitGroup wg.Add(1) for partition := Range partitionList {/ / to iterate through all the partition / / for each partition creates a corresponding partition consumer PC, err: = consumer. ConsumePartition (" test -- golang kafka ", int32(partition), sarama.OffsetNewest) if err ! = nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, Err) return} defer PC.asyncclose () go func(sarama.partitionConsumer) {defer Wg.done () for MSG := range pc.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, msg.Value) } }(pc) } wg.Wait() }Copy the code