Experience Kafka
This time we use the Java API to send and consume messages
One-to-one delivery consumption
Start zK and three brokers
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
> bin/kafka-server-start.sh config/server-1.properties
> bin/kafka-server-start.sh config/server-2.properties
Copy the code
Create the chat topic with partition and replica 3
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chat
Copy the code
Maven introduces dependencies
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
Copy the code
Write producer code, specify topic as chat, and send 10 messages
public class Sender {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers"."localhost:9092");
props.put("acks"."all");
props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("chat".""+Integer.toString(i), Integer.toString(i))); producer.close(); }}Copy the code
Write consumer code, very simple, print out what you consume
public class Receiver {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers"."localhost:9092");
props.setProperty("group.id"."chat-room-1");// Consumer group ID
props.setProperty("enable.auto.commit"."true");
props.setProperty("auto.commit.interval.ms"."1000");
props.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("chat"));/ / chat topic
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}}Copy the code
We can start a consumer and then a producer, and we can see the consumer output 10 pieces of data
offset = 8, key = 0, value = 0
offset = 9, key = 2, value = 2
offset = 10, key = 3, value = 3
offset = 11, key = 9, value = 9
offset = 4, key = 4, value = 4
offset = 5, key = 6, value = 6
offset = 8, key = 1, value = 1
offset = 9, key = 5, value = 5
offset = 10, key = 7, value = 7
offset = 11, key = 8, value = 8
Copy the code
You can see that the message is not ordered, because the message is distributed to three partitions, and the messager consumes three partitions
Multi-consumer consumption
Since we have three partitions for this topic, we can suspend three consumers (same consumer group) and create a new producer. This time we can see that the message is split among three consumers and there will be no duplicate messages
consumer-0:
offset = 4, key = 0, value = 0
offset = 5, key = 2, value = 2
offset = 6, key = 3, value = 3
offset = 7, key = 9, value = 9
consumer-1:
offset = 4, key = 1, value = 1
offset = 5, key = 5, value = 5
offset = 6, key = 7, value = 7
offset = 7, key = 8, value = 8
consumer-2:
offset = 2, key = 4, value = 4
offset = 3, key = 6, value = 6
Copy the code
If we take four customers, let’s see what happens
consumer-0:
offset = 16, key = 0, value = 0
offset = 17, key = 2, value = 2
offset = 18, key = 3, value = 3
offset = 19, key = 9, value = 9
consumer-1:
consumer-2:
offset = 16, key = 1, value = 1
offset = 17, key = 5, value = 5
offset = 18, key = 7, value = 7
offset = 19, key = 8, value = 8
consumer-3
offset = 8, key = 4, value = 4
offset = 9, key = 6, value = 6
Copy the code
There are only 3 partitions, so at most only 3 consumers work at the same time, and the extra one will not consume
Order news
If we want messages to be delivered and consumed in an orderly fashion, we can specify partitions when sending
producer.send(new ProducerRecord<String, String>("chat".1, Integer.toString(i), Integer.toString(i)));// The second parameter specifies partition=1
Copy the code
We have two consumers
consumer-0:
consumer-1:
offset = 10, key = 0, value = 0
offset = 11, key = 1, value = 1
offset = 12, key = 2, value = 2
offset = 13, key = 3, value = 3
offset = 14, key = 4, value = 4
offset = 15, key = 5, value = 5
offset = 16, key = 6, value = 6
offset = 17, key = 7, value = 7
offset = 18, key = 8, value = 8
offset = 19, key = 9, value = 9
Copy the code
As a result, only one consumer consumes the message, and as you can see, the message is in order