The environment
- springcloudAlibaba
- docker
- zookeeper
- kafka
reference
Blog.csdn.net/yuanlong122…
The installation
Mirror pull
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
Copy the code
They are running
docker run -itd --name zookeeper -p 2181:2181 wurstmeister/zookeeper
Copy the code
Note: Many web sites run with run-i, which may cause an error when you connect later
Run the kafka
docker run -itd --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME = 127.0.0.1 - env KAFKA_ADVERTISED_PORT = 9092 wurstmeister/kafka: latest parameters KAFKA_ZOOKEEPER_CONNECT= Configure the path for ZooKeeper to manage kafka. 10.9.44.11:2181/kafka KAFKA_ADVERTISED_HOST_NAME= Register the address port of Kafka with ZooKeeper KAFKA_ADVERTISED_PORT= ZooKeeper port numberCopy the code
Note: -itd is also used here, otherwise the Kafka container may fail
To successfully start Kafka, run the two containers in turn, and then use Docker exec to enter the Kafka container
Go to the Kafka folder and create the theme using the following command
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 2 --topic topic1
Copy the code
Note: If Kafka has only one server, then the –replication-factor configuration must be less than 2
Instead of creating a topic manually, kafka automatically creates a topic for us when we send a message in kafkatemplate-send (“topic1”, normalMessage), but in this case we create a topic with only one partition by default. Partitions also have no copies.
Introduction of depend on
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Copy the code
Configuration instructions
Kafka: bootstrap-Servers: 127.0.0.1:9092 Producer: # Number of times a message was resent after an error occurred. Retries: 0 # When multiple messages need to be sent to the same partition, the producer puts them in the same batch. This parameter specifies the amount of memory, in bytes, that can be used by a batch. Batch-size: 16384 # Sets the size of the producer memory buffer. Buffer-memory: 33554432 # Key serializer: Org.apache.kafka.com mon. Serialization. StringSerializer # value way of serializing the value - serializer: Org.apache.kafka.com mon. Serialization. # StringSerializer acks = 0: producers in successful writing messages before will not wait for any response from the server. # acks=1: The producer will receive a successful response from the server as soon as the cluster leader node receives the message. # acks=all: The producer receives a successful response from the server only when all participating nodes have received the message. Acks: 1 # properties: use a custom partition selector # # {partitioner. Class: org. Example. Config. CustomizePartitioner} consumer: < span style = "font-size: 14px; line-height: 14px;" 1S # This property specifies what the consumer should do if it reads a partition with no offset or if the offset is invalid: # latest (default) If the offset is invalid, the consumer will start reading data from the latest record (records generated after the consumer started) # earliest: In the case of invalid offsets, the consumer will read the partition's record from the starting position auto-offset-reset: The default value is true. In order to avoid duplicate data and data loss, it can be set to false, and then manually submit the offset. Enable-auto-commit: False enable-auto-commit: False Enable-auto-commit: false Enable-auto-commit: false Org.apache.kafka.com mon. Serialization. StringDeserializer # value way of deserialized value - deserializer: Org.apache.kafka.com mon. Serialization. StringDeserializer group - id: # defaultConsumerGroup most mass consumption 50 # Max - poll - records: 50 Listener: # Number of threads running in the listener container. Concurrency: 5 # Listner commit ACK-mode: manual_immediate missing-topics-fatal: false every time you call ackCopy the code
practice
General Message Push
producers
@RestController @RequestMapping("kafka") public class KafkaController { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @getMapping ("pushMessage") public void pushMessage(@requestParam (required = false) String message) { kafkaTemplate.send("topic1", message); }}Copy the code
consumers
@Component public class KafkaConsumer { /** * @description: TODO consumption listener * errorHandler can specify methods to consume error calls * containerFactory can use custom message filters * @sendto ("topic2") message forwarding * @Author Shock Wave 2 * @date * @version 1.0 */ @kafkalistener (Topics = {"topic1"},containerFactory = "filterContainerFactory") public void @date * @version 1.0 */ @Kafkalistener (topics = {"topic1"},containerFactory = "filterContainerFactory" onMessage(ConsumerRecord<? ,? > Record, Acknowledgment ACK){// The second parameter is manual · ACK parameter // Which topic and partition message to consume, and print the message content system.out.println (" Simple consumption: "+record.topic()+"-"+record.partition()+"-"+record.value()); Ack.acknowledge (); }}Copy the code
Bring back notification push
/** * @description: * @author Shock wave 2 * @date * @version 1.0 */ @getMapping ("pushCallBackMessageV1") public void pushCallBackMessage(@RequestParam(required = false) String message) { kafkaTemplate.send("topic1", Message). The addCallback (success - > {/ / message is sent to the topic of String topic = success. GetRecordMetadata (). The topic (); / / message is sent to the partition int partition = success. GetRecordMetadata (). The partition (); / / the message within the partition of the offset long offset = success. GetRecordMetadata (). The offset (); Println (" send message successfully :" + topic + "-" + partition + "-" + offset); }, failure -> {system.out.println (" failed to send message :" + failure.getMessage()); }); }Copy the code
Transaction message
/** * @description: * @author Shock Wave 2 * @date * @version 1.0 */ @getMapping ("pushTransactionMessage") public void PushTransactionMessage (@requestParam (required = false) String Message){ Behind the error message not sent kafkaTemplate. ExecuteInTransaction (operations - > {operations. The send (" topic1 ", message); throw new RuntimeException("fail"); }); // Kafkatemplate-send ("topic1",message); // throw new RuntimeException("fail"); }Copy the code
Custom message filters
/** * @author shock wave 2 * @version 1.0 * @description: TODO Kafka message filter * @date 2021/8/3 16:19 */ @Component public class KafkaConfigConsumer {@autowired private ConsumerFactory consumerFactory; / / message filter @ Bean public ConcurrentKafkaListenerContainerFactory filterContainerFactory () { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); // The filtered messages will be discarded factory.setackdiscarded (true); / / message filtering strategy factory. SetRecordFilterStrategy (consumerRecord - > {System. Out. Println (consumerRecord); if (consumerRecord.value().toString().contains("22")) { return false; } // If a message returns true, it will be filtered. }); return factory; }}Copy the code
The consumer uses the following comment to specify the filter
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
Copy the code
Custom partitioning
/** * @author shock wave 2 * @version 1.0 * @description: TODO (); /** * @author shock wave 2 * @version 1.0 * @description: TODO (); * * ② If a message is sent without a partition but a key is specified (Kafka allows one key for each message), hash the key value and route it to the specified partition according to the calculation result. In this case, all messages with the same key can be guaranteed to go to the same partition. * * (3) If neither patition nor key is specified, a patition is selected using the default kafka partitioning policy. * @date 2021/8/3 16:07 */ public class CustomizePartitioner implements Partitioner { @Override public int Partition (String s, Object O, byte[] bytes, Object o1, byte[] bytes1, Cluster Cluster) {// User-defined partition rule return 0; } @Override public void close() { } @Override public void configure(Map<String, ? > map) { } }Copy the code
Custom exception handling
@Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return (message, exception, User) -> {system.out.println (" user: "+message.getPayload()); return null; }; }Copy the code
The consumer uses the following comments to customize the exception handling of a consumption failure
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory",errorHandler="consumerAwareErrorHandler")
Copy the code
Mass consumption
The configuration file
# Enable batch processing listener: type: batch # Batch processing a maximum of 50 data consumer: max-poll-records: 50 For details, see the preceding configuration fileCopy the code
consumers
/**
* @description: TODO 使用list接收批量消息
* @author shock wave 2
* @date
* @version 1.0
*/
@KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费一次,records.size()="+records.size());
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.value());
}
}
Copy the code
other
@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {@TopicPartition(topic = "topic1", partitions = { "0" }),@TopicPartition(topic = "topic2", partitions = "0", PartitionOffsets = @partitionOffset (partition = "1", initialOffset = "8"))}) * 1 * ② groupId: consumer groupId; * ③ Topics: topics that can be listened on * (4) topicPartitions: more detailed listener information can be configured, and topic, parition, offset listener can be specified.Copy the code