In this third article on Kafka, we’ll cover how to read and write Kafka in Python. In this article, we’ll use a third-party library called Kafka-Python. You can install it using PIP or Pipenv. Choose either of the following two installation schemes.

python3 -m pip install kafka-python
pipenv install kafka-python
Copy the code

As shown below:

In this article, we will use the shortest code to implement an example of reading and writing Kafka.

Creating a Configuration File

Since both producers and consumers need to connect to Kafka, I have written a separate configuration file config.py to store the parameters required to connect to Kafka.

# config.py
SERVER = '123.45.32.11:1234'
USERNAME = 'kingname'
PASSWORD = 'kingnameisgod'
TOPIC = 'howtousekafka'
Copy the code

The Kafka used in this demonstration was built by our platform group colleagues and requires an account PASSWORD to connect, so I added USERNAME and PASSWORD in the configuration file. If you don’t have an account or password for Kafka, you just need SERVER and TOPIC.

Creating a producer

The code is so simple that it doesn’t even need to be explained. We first connect Kafka with the KafkaProducer class, get a producer object, and then write data into it.

import json
import time
import datetime
import config
from kafka import KafkaProducer


producer = KafkaProducer(bootstrap_servers=config.SERVER,
                         value_serializer=lambda m: json.dumps(m).encode())

for i in range(100):
    data = {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    producer.send(config.TOPIC, data)
    time.sleep(1)
Copy the code

The bootstrap_Servers parameter specifies the server connection address for Kafka.

The value_serializer argument is used to specify the serialization method. Here I use JSON to serialize the data so that I pass a dictionary to Kafka and Kafka automatically converts it into a JSON string.

As shown below:

Note that in the figure above, I added four more parameters:

security_protocol="SASL_PLAINTEXT"
sasl_mechanism="PLAIN"
sasl_plain_username=config.USERNAME
sasl_plain_password=config.PASSWORD
Copy the code

These four parameters are added because Kafka needs to be connected with a password. If your Kafka does not have a password, you do not need these four parameters.

Create consumers

The Kafka consumer also needs to connect to Kafka, first initializing a consumer object using the KafkaConsumer class, and then iterating through the data. The code is as follows:

import config
from kafka import KafkaConsumer


consumer = KafkaConsumer(config.TOPIC,
                         bootstrap_servers=config.SERVER,
                         group_id='test',
                         auto_offset_reset='earliest')
for msg in consumer:
    print(msg.value)
Copy the code

The first parameter to KafkaConsumer is used to specify the Topic. You can interpret this Topic as the Key of Redis.

Bootstrap_servers specifies the Kafka server connection address.

The string following the group_id parameter can be filled in arbitrarily. If the Topic of two programs is the same as group_id, then the data they read will not be repeated. If the Topic of two programs is the same but group_id is different, then they will consume all data separately without affecting each other.

The auto_offset_rest parameter has two values, “earliest” and “latest”. If this parameter is omitted, the default is “latest”. This parameter will be described separately. I’m going to skip this.

Once Kafka is connected, iterating directly through the for loop on the consumer object continuously fetches data from it.

Run the demo

Run two consumer programs and one producer program, as shown below.

We can see that the two consumer programs read data without duplication and omission.

When all the data has been consumed, if you close both consumer programs and run one again, you will find that no data will be printed.

However, if you change the group_id, the program can start consuming from the beginning again, as shown below:

A few things that a lot of people get confused about

The earliest and latest

When we created the consumer object, we had a parameter called auto_offset_reset=’earliest’. “Earliest” and “latest” will be read backwards from the top of Topic, and “latest” will be read backwards from the top of Topic. After the program runs, the new data will be read.

This view is not correct.

The auto_offset_reset parameter is only useful when a group is run for the first time. From the second run, the parameter is invalid.

Suppose you now have 100 data in your Topic, and you set a new group_id to test2. Auto_offset_reset is set to “earliest”. So when your consumer runs, Kafka sets your offset to 0 and lets you start spending from scratch.

Suppose you now have 100 data in your Topic, and you set a new group_id to test3. Auto_offset_reset is set to latest. So when your consumer runs, Kafka doesn’t return you any data, the consumer looks stuck, but Kafka forces the state of the first 100 pieces of data to be already consumed by you. So your offset is now 99. The consumer cannot read it until the producer inserts a new piece of data. The offset for this new piece of data becomes 100.

Suppose you now have 100 data in your Topic, and you set a new group_id to test4. Auto_offset_reset is set to “earliest”. So when your consumer runs, Kafka sets your offset to 0 and lets you start spending from scratch. When you reach item 50, you close the consumer program, set auto_offset_reset to latest, and run it again. At this point, the consumer will continue to read from item 51. The remaining 50 pieces of data will not be skipped.

So, the function of auto_offset_reset is to give you an initial offset when your group is run for the first time. Once your group has offset, the auto_offset_reset parameter is no longer used.

How are partitions allocated?

For the same Group in the same Topic:

Let’s say your Topic has 10 partitions and you start with only 1 consumer. Then the consumer will take turns reading data from each of the 10 partitions.

When you start the second consumer, Kafka snatches five partitions from the first consumer and gives them to the second consumer. So two consumers each read 5 partitions. Each other.

When a third consumer appears, Kafka snatches another Partition from the first consumer and two partitions from the second consumer to the third consumer. Thus, consumer 1 has 4 partitions, consumer 2 has 3 partitions, and consumer 3 has 3 partitons, which do not affect each other.

When you have 10 consumers consuming together, each consumer reads a Partition without affecting each other.

When the 11th consumer appears, it cannot read anything because it has not been allocated to a Partition.

Therefore, in the last article, I said that in the same Topic and Group, how many partitons you have can be consumed simultaneously by how many processes.

Does Kafka not repeat anything at all?

In extreme cases Kafka repeats and misses, but such extreme cases are not common. If your Kafka is leaking data frequently, or duplicates data all the time, then your environment is not built properly, or there is a problem with your code.

advice

Again: professional people do professional things, don’t build your own Kafka cluster. Let a dedicated colleague copy build and maintain, you just use. That’s the most efficient way to do it.