Message is sent
1. Producer Sends messages
The new version of Kafka sends messages asynchronously. In the process of sending a message, two threads are involved. One main thread, one daemon thread, one send thread. There is also a thread shared variable (cache) RecordAccumulator. The main thread is responsible for creating the message and placing it in the RecordAccumulator cache. The daemon thread sends messages from the cache to the Kafka Broker
2. Refine the message sending process
2.1 the main thread
- Encapsulate the message as
ProducerRecord
Object format and callsend
Method to send a message - Enter the
producer
Interceptors (message format detection, etc.) - update
kafka
Cluster data - Serialize the message object into
byte
An array of - Use a divider for partition calculations
- Appends a message to a shared thread variable
RecordAccumulator
In the
2.2 Start work when KafkaProducer instantiates SEND
send
The thread fromRecordAccumulator
Retrieve the message and process the message format- Build the message send request object
request
- Hand over a request
Selector
And put the message on the queue - The request to remove the request queue upon receipt of the response invokes the callback function on each message
3. Actual code case of message sending
3.1 Project dependency references
Note that the maven dependency version you refer to must be the same as the kafka version installed on your server
<! Kafka messaging service -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>Against 2.4.1</version>
</dependency>
Copy the code
3.2 Understanding related classes
KafkaProducer
Create a producer object that sends dataProducerConfig
Get connectedkafka
Some of the basic configurations (mana record)ProducerRecord
Each piece of data must be encapsulated asProducerRecord
Object to send
3.3 Sending Simple Message Demo
public class ProductTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties prop = newProperties(); Prop. Put (producerconfig. BOOTSTRAP_SERVERS_CONFIG, producerConfig. BOOTSTRAP_SERVERS_CONFIG,"192.168.10.126:9092");
// Validation mechanism
prop.put(ProducerConfig.ACKS_CONFIG,"all");
// Number of retries
prop.put(ProducerConfig.RETRIES_CONFIG,1);
// Batch size
prop.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// Wait time
prop.put(ProducerConfig.LINGER_MS_CONFIG,1);
// Buffer size
prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33356444);
// Key serialization
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//value specifies the serialization mode
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = new KafkaProducer<String, String>(props);
For (int I = 0; i < 100; I++) {// if there is an exception, send will fail. If there is no exception, send will succeed ProducerRecord<>("test",i+"", I +""),(metadata,exception) -> {if(exception == null) {system.out.println (" message sent successfully ->" + metadata.offset()); }else { exception.printStackTrace(); }}); } * /
for (int i = 0; i < 100; i++) {
RecordMetadata metadata = producer.send(
new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i))
).get();
System.out.println(metadata.offset());
}
// Close the producerproducer.close(); }}Copy the code
3.4 Message Viewing
After executing the above code, you can go to the server and view the message you just created with the command
./kafka-console-consumer.sh --bootstrap-server 192.168.10.126:9092 --from-beginning --topic test
Copy the code
4. Retry mechanism
The message received
1. Consumer-related classes
KafkaConsumer
Create an object to consume the messageConsumerConfig
The consumer object instantiates the relevant configuration itemsConsumerRecords
Each piece of data can be consumed only after it is encapsulated into this object
Note that no close operation is required after the consumer object is created, and the thread is always in the background, retrieving messages from the message server
2. Consume server message Demo
public class ConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.126:9092");
// serialize key
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
// serialize value
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
/ / automatic offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
/ / group
props.put(ConsumerConfig.GROUP_ID_CONFIG,"1205");
// Automatic commit interval
props.put("auto.commit.interval.ms"."1000");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
// Subscribe to the topic
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(100));
for(ConsumerRecord<String,String> str : records) {
System.out.println(String.format("topic = %s, offset= %s ,value= %s",str.topic(),str.offset(),str.value())); }}}}Copy the code
3. Offset submission mode
3.1 Automatic Submission
Using the properties parameter
Put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); Props. Put ("auto.commit.interval.ms"."1000");
Copy the code
3.2 Manual Submission
Manual submission requires calling the specified method in the code
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(100));
for(ConsumerRecord<String,String> str : records) {
System.out.println(String.format("topic = %s, offset= %s ,value= %s",str.topic(),str.offset(),str.value()));
}
// Manually submit offset
consumer.commitAsync();
}
Copy the code
3.3 Manual and automatic submission areas
- In a real production environment where manual commits are provided, this message consumption is manageable. Each time a message is consumed, there is an offset. Even if the message service crashes, the restart message service reads the last consumed message from the ZK
offset
To ensure that messages continue from current consumption without reconsumption or loss - Manual submission
offset
Is ordered to ensure that the data is in order