1. What is Confluent-kafka?
Confluent Platform is a streaming data Platform, capable of organizing and managing data from different data sources, with a stable and efficient system. The Confluent Platform provides not only the system for data transmission, but also all the tools: tools to connect to data sources, applications, and data reception. Confluent is a Kafka-built software that has an enterprise version for 30 days and an open source versionCopy the code
Here we use the open source version of Confluent-Kafka, which includes the following components:
Complete tool free trial components, please see website: www.confluent.io/download
Component functions is introduced: www.cnblogs.com/dadadecheng…
2. Start building Confluent-Kafka
2.1 Installing the latest confluent-Kafka version
Yum install curl which RPM - y - import https://packages.confluent.io/rpm/5.3/archive.keyCopy the code
Yum configuration source
cat > /etc/yum.repos.d/confluent.repo <<EOF [Confluent.dist] name=Confluent repository (dist) Baseurl = https://packages.confluent.io/rpm/5.3/7 gpgcheck = 1 gpgkey = https://packages.confluent.io/rpm/5.3/archive.key Enabled = 1 [Confluent] name = Confluent repository baseurl = https://packages.confluent.io/rpm/5.3 gpgcheck = 1 EOF gpgkey = https://packages.confluent.io/rpm/5.3/archive.key enabled = 1Copy the code
Confluent Platform using only Confluent Community components:
Yum clean all && yum install confluent -community-2.12-yCopy the code
2.2 installation jdk1.8
Download the JDK from the Oracle official website, upload it to the server, and run the following command to install it:
rpm -ivh jdk-8u231-linux-x64.rpm
Copy the code
Oracle jdk1.8 download page: www.oracle.com/technetwork…
2.3 configuration zookeeper
vim /etc/kafka/zookeeper.properties
The cluster configuration
dataDir=/data/zookeeper/data/
dataLogDir=/data/zookeeper/logs/
maxClientCnxns=500
clientPort=2181
initLimit=5
syncLimit=2
server.1=xxx:2888:3888
server.2=xxx:2888:3888
server.3=xxx:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
Copy the code
Note * Zoo1 zoo2 zoo3 indicates the host name. Hosts have been configured in advance
Echo "1" > / data/zookeeper/data / # myid please perform on the zoo1 machine echo "2" > / data/zookeeper/data / # myid please perform on the zoo2 machine echo "3" > / data/zookeeper/data / # myid please perform on the zoo3 machine revCopy the code
Setting directory Permissions
chmod 777 -R /data/zookeeper/logs/
chmod 777 -R /data/zookeeper/data/
Copy the code
Set the startup and startup of ZooKeeper
systemctl enable confluent-zookeeper && systemctl start confluent-zookeeper
Copy the code
Check the status
systemctl status confluent-zookeeper
Copy the code
See the log
tail -f /var/log/messages
Copy the code
2.3 configuration kafka
Vim/etc/kafka/server properties zoo # modify the following two options
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181broker.id.generation.enable=true
Copy the code
Set up boot and start Kafka
systemctl enable confluent-kafka && systemctl start confluent-kafka
Copy the code
Check the status
systemctl status confluent-kafka
Copy the code
See the log
tail -f /var/log/messages
Copy the code
3. Test kafka production messages
3.1 create a topic
kafka-topics --create --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --replication-factor 1 --partitions 5 --topic kafkatest
Copy the code
3.2 Production Messages
kafka-console-producer --broker-list zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest
Copy the code
3.3 Consuming messages
kafka-console-consumer --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest --from-beginning
Copy the code
4. Use Golang Demo to test the time consumed by producing messages
package main import ( "fmt" "log" "time" "github.com/Shopify/sarama" ) func main() { var address = []string{"ip1:9092","ip2:9092","ip3:9092"} producer(address) } func producer(address []string) { config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.Timeout = 5 * time.Second p, err := sarama.NewSyncProducer(address, config) if err ! = nil { log.Println(err) } defer p.Close() strKey := "key: " srcValue := "testKafka: test message, index=%d" log.Println("start") for i := 0; i < 10000; i++ { value := fmt.Sprintf(srcValue, i) msg := &sarama.ProducerMessage{ Key: sarama.StringEncoder(strKey), Topic: kafkatest, Value: sarama.ByteEncoder(value), } part, offset, err := p.SendMessage(msg) if err ! = nil { log.Println(err, value, part, offset) } } log.Println("end") }Copy the code
4.1 Time consumed by 10000 and 100000 pieces
Article 10000:
Article 100000:
Still very fast!
5. Write about consumers
package main import ( "fmt" "github.com/Shopify/sarama" "sync" ) func main() { consumer, err := sarama.NewConsumer([]string{"ip:9092"}, nil) if err ! = nil { fmt.Println(err) } fmt.Println("1") partitionList, err := consumer.Partitions("test-golang-kafka") if err ! = nil { fmt.Println(err) } fmt.Println("2") fmt.Println(partitionList) var wg sync.WaitGroup wg.Add(1) for partition := Range partitionList {/ / to iterate through all the partition / / for each partition creates a corresponding partition consumer PC, err: = consumer. ConsumePartition (" test -- golang kafka ", int32(partition), sarama.OffsetNewest) if err ! = nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, Err) return} defer PC.asyncclose () go func(sarama.partitionConsumer) {defer Wg.done () for MSG := range pc.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, msg.Value) } }(pc) } wg.Wait() }Copy the code