Experience Kafka

This time we use the Java API to send and consume messages

One-to-one delivery consumption

Start zK and three brokers

> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
> bin/kafka-server-start.sh config/server-1.properties
> bin/kafka-server-start.sh config/server-2.properties
Copy the code

Create the chat topic with partition and replica 3

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chat
Copy the code

Maven introduces dependencies

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>
Copy the code

Write producer code, specify topic as chat, and send 10 messages

public class Sender {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers"."localhost:9092");
        props.put("acks"."all");
        props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>("chat".""+Integer.toString(i), Integer.toString(i))); producer.close(); }}Copy the code

Write consumer code, very simple, print out what you consume

public class Receiver {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers"."localhost:9092");
        props.setProperty("group.id"."chat-room-1");// Consumer group ID
        props.setProperty("enable.auto.commit"."true");
        props.setProperty("auto.commit.interval.ms"."1000");
        props.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("chat"));/ / chat topic
      
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}}Copy the code

We can start a consumer and then a producer, and we can see the consumer output 10 pieces of data

offset = 8, key = 0, value = 0
offset = 9, key = 2, value = 2
offset = 10, key = 3, value = 3
offset = 11, key = 9, value = 9
offset = 4, key = 4, value = 4
offset = 5, key = 6, value = 6
offset = 8, key = 1, value = 1
offset = 9, key = 5, value = 5
offset = 10, key = 7, value = 7
offset = 11, key = 8, value = 8
Copy the code

You can see that the message is not ordered, because the message is distributed to three partitions, and the messager consumes three partitions

Multi-consumer consumption

Since we have three partitions for this topic, we can suspend three consumers (same consumer group) and create a new producer. This time we can see that the message is split among three consumers and there will be no duplicate messages

consumer-0:
offset = 4, key = 0, value = 0
offset = 5, key = 2, value = 2
offset = 6, key = 3, value = 3
offset = 7, key = 9, value = 9

consumer-1:
offset = 4, key = 1, value = 1
offset = 5, key = 5, value = 5
offset = 6, key = 7, value = 7
offset = 7, key = 8, value = 8

consumer-2:
offset = 2, key = 4, value = 4
offset = 3, key = 6, value = 6
Copy the code

If we take four customers, let’s see what happens

consumer-0:
offset = 16, key = 0, value = 0
offset = 17, key = 2, value = 2
offset = 18, key = 3, value = 3
offset = 19, key = 9, value = 9

consumer-1:

consumer-2:
offset = 16, key = 1, value = 1
offset = 17, key = 5, value = 5
offset = 18, key = 7, value = 7
offset = 19, key = 8, value = 8

consumer-3
offset = 8, key = 4, value = 4
offset = 9, key = 6, value = 6
Copy the code

There are only 3 partitions, so at most only 3 consumers work at the same time, and the extra one will not consume

Order news

If we want messages to be delivered and consumed in an orderly fashion, we can specify partitions when sending

producer.send(new ProducerRecord<String, String>("chat".1, Integer.toString(i), Integer.toString(i)));// The second parameter specifies partition=1
Copy the code

We have two consumers

consumer-0:

consumer-1:
offset = 10, key = 0, value = 0
offset = 11, key = 1, value = 1
offset = 12, key = 2, value = 2
offset = 13, key = 3, value = 3
offset = 14, key = 4, value = 4
offset = 15, key = 5, value = 5
offset = 16, key = 6, value = 6
offset = 17, key = 7, value = 7
offset = 18, key = 8, value = 8
offset = 19, key = 9, value = 9
Copy the code

As a result, only one consumer consumes the message, and as you can see, the message is in order