The background,
The Stream type is a new type since Redis5. In this article, we implement the use of Spring Boot Data Redis to consume data in redis Stream. Realize independent consumption and consumption group consumption.
Second, integration steps
1. Introduce jar packages
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
Copy the code
Mainly the package above, other irrelevant packages omitted here import.
2. Configure the RedisTemplate dependency
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
/ / this place do not use json serialization, if you are using ObjectRecord transfer objects, there may be a problem, there will be a Java. Lang. IllegalArgumentException: the Value must not be null! error
redisTemplate.setHashValueSerializer(RedisSerializer.string());
returnredisTemplate; }}Copy the code
Note:
Note the serialization method of setHashValueSerializer. Details will be discussed later.
3. Prepare an entity object
This entity object is the object that needs to be sent to the Stream.
@Getter
@Setter
@ToString
public class Book {
private String title;
private String author;
public static Book create(a) {
com.github.javafaker.Book fakerBook = Faker.instance().book();
Book book = new Book();
book.setTitle(fakerBook.title());
book.setAuthor(fakerBook.author());
returnbook; }}Copy the code
Each time the create method is called, an object of Book is automatically generated, and the object simulation data is generated using Javafaker simulation.
4. Write a constant class and configure the Stream name
/** ** constant ** /
public class Cosntants {
public static final String STREAM_KEY_001 = "stream-001";
}
Copy the code
Write a producer that produces data into a Stream
Write a producer that generates ObjectRecord data into the Stream
/** * Message producer */
@Component
@RequiredArgsConstructor
@Slf4j
public class StreamProducer {
private final RedisTemplate<String, Object> redisTemplate;
public void sendRecord(String streamKey) {
Book book = Book.create();
log.info("Generate a book of information :[{}]", book);
ObjectRecord<String, Book> record = StreamRecords.newRecord()
.in(streamKey)
.ofObject(book)
.withId(RecordId.autoGenerate());
RecordId recordId = redisTemplate.opsForStream()
.add(record);
log.info("Record-id returned :[{}]", recordId); }}Copy the code
2. Produce data to Stream every 5s
/** * periodically generates messages to the stream */
@Component
@AllArgsConstructor
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {
private final StreamProducer streamProducer;
@Override
public void run(ApplicationArguments args) {
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
0.5, TimeUnit.SECONDS); }}Copy the code
Third, independent consumption
Independent consumption refers to the direct consumption of messages in a Stream outside the consumption group. The data in the Stream is read using the Xread method. The data in the Stream is not deleted after reading, but still exists. If multiple programs read using Xread at the same time, messages can be read.
1, implementation from scratch consumption -xread implementation
The implementation here is to consume from the first message of the Stream
package com.huan.study.redis.stream.consumer.xread;
import com.huan.study.redis.constan.Cosntants;
import com.huan.study.redis.entity.Book;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** ** Unconsume Stream data directly, and get all Stream messages */
@Component
@Slf4j
public class XreadNonBlockConsumer01 implements InitializingBean.DisposableBean {
private ThreadPoolExecutor threadPoolExecutor;
@Resource
private RedisTemplate<String, Object> redisTemplate;
private volatile boolean stop = false;
@Override
public void afterPropertiesSet(a) {
// Initialize the thread pool
threadPoolExecutor = new ThreadPoolExecutor(1.1.0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("xread-nonblock-01");
return thread;
});
StreamReadOptions streamReadOptions = StreamReadOptions.empty()
// If there is no data, the blocking time for 1s needs to be less than the time configured for 'spring.redis.timeout'
.block(Duration.ofMillis(1000))
// Block until data is retrieved, a timeout exception may be reported
// .block(Duration.ofMillis(0))
// Get 10 data at a time
.count(10);
StringBuilder readOffset = new StringBuilder("0-0");
threadPoolExecutor.execute(() -> {
while(! stop) {// When using xread to read data, you need to record the last read to offset, and then use it as the next read offset, otherwise the read data will have problems
List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
.read(Book.class, streamReadOptions, StreamOffset.create(Cosntants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
if (CollectionUtils.isEmpty(objectRecords)) {
log.warn("No data was obtained.");
continue;
}
for (ObjectRecord<String, Book> objectRecord : objectRecords) {
log.info(Id :[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
readOffset.setLength(0); readOffset.append(objectRecord.getId()); }}}); }@Override
public void destroy(a) throws Exception {
stop = true;
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS); }}Copy the code
Note:
The next time data is read, offset is the last obtained ID value, otherwise data may be missed.
2, StreamMessageListenerContainer implementation independent consumption
See the code for consumer group consumption below
Consumption group consumption
1. Implement StreamListener interface
The purpose of this interface is to consume data in the Stream. Need to pay attention to at the time of registration are using streamMessageListenerContainer. ReceiveAutoAck () or streamMessageListenerContainer. The receive () method, if this is the second, you need to manually ack, Manual ack code: redisTemplate opsForStream () acknowledge (” key “, “group”, “recordId”);
/** * asynchronously consume ** via listener@authorHuan. Fu 2021/11/10-5:51 PM */
@Slf4j
@Getter
@Setter
public class AsyncConsumeStreamListener implements StreamListener<String.ObjectRecord<String.Book>> {
/** * Types of consumers: independent consumption, consumer group consumption */
private String consumerType;
/** * Consumer group */
private String group;
/** * A consumer in the consumer group */
private String consumerName;
public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
this.consumerType = consumerType;
this.group = group;
this.consumerName = consumerName;
}
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(ObjectRecord<String, Book> message) {
String stream = message.getStream();
RecordId id = message.getId();
Book value = message.getValue();
if (StringUtils.isBlank(group)) {
log.info("[{}]: received a message stream:[{}], ID :[{}],value:[{}]", consumerType, stream, id, value);
} else {
log.info("[{}] group: [{}] consumerName: [{}] receives a message stream: [{}], id: [{}], value: [{}]." ", consumerType,
group, consumerName, stream, id, value);
}
// If it is not automatic ack, it needs to be ack manually in this place
// redisTemplate.opsForStream()
// .acknowledge("key","group","recordId");}}Copy the code
2. Error handling during the process of getting a consumption or consumption message
/** * An exception occurred during the StreamPollTask or listener consumption of the message **@authorHuan. Fu 2021/11/11-3:44 PM */
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
log.error("Something abnormal has occurred.", t); }}Copy the code
3. Consumer group configuration
/** * Redis Stream consumer group configuration **@authorHuan. Fu 2021/11/11-12:22 PM */
@Configuration
public class RedisStreamConfiguration {
@Resource
private RedisConnectionFactory redisConnectionFactory;
/** * can support both individual consumption and consumer group consumption * <p> * can support dynamic addition and deletion of consumers * <p> * consumer groups need to be created in advance **@return StreamMessageListenerContainer
*/
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {
AtomicInteger index = new AtomicInteger(1);
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setName("async-stream-consumer-" + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// The maximum number of messages to be obtained at a time
.batchSize(10)
// Run the Stream poll task
.executor(executor)
// Serialize the Stream Key
.keySerializer(RedisSerializer.string())
// serialize the key of the field behind Stream
.hashKeySerializer(RedisSerializer.string())
// serialize the value of the field after Stream
.hashValueSerializer(RedisSerializer.string())
// How long does the Stream block when there is no message in it? It needs to be shorter than 'spring.redis
.pollTimeout(Duration.ofSeconds(1))
Filed and value of the object are converted into a Map. For example, the Book object is converted into a Map
.objectMapper(new ObjectHashMapper())
Exception handling occurred during the process of getting the message or getting the message to a specific message handler
.errorHandler(new CustomErrorHandler())
// Convert the Record sent to Stream to ObjectRecord, the type specified here
.targetType(Book.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
// Independent consumption
String streamKey = Cosntants.STREAM_KEY_001;
streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
new AsyncConsumeStreamListener("Independent consumption".null.null));
// Consumer group A, no automatic ACK
// Start consuming from messages that are not assigned to consumers in the consumer group
streamMessageListenerContainer.receive(Consumer.from("group-a"."consumer-a"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("Consumer group consumption"."group-a"."consumer-a"));
// Start consuming from messages that are not assigned to consumers in the consumer group
streamMessageListenerContainer.receive(Consumer.from("group-a"."consumer-b"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("Consumer group consumption A"."group-a"."consumer-b"));
// Consumer group B, automatic ACK
streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-b"."consumer-a"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("Consumer group consumption B"."group-b"."consumer-bb"));
// If you need to personalize a consumer, pass the StreamReadRequest object when the Register method is called
returnstreamMessageListenerContainer; }}Copy the code
Note:
Build your consumer group in advance
127.0.0.1:6379> xgroup create stream-001 group-a $OK 127.0.0.1:6379> xgroup create stream-001 group-b $OKCopy the code
1. Exclusive consumption configuration
StreamMessageListenerContainer. The receive (StreamOffset fromStart (streamKey), new AsyncConsumeStreamListener (" independent consumption ", null, null));Copy the code
Do not pass Consumer.
2. Configure consumer groups – do not automatically ack messages
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"), StreamOffset.create(streamKey, ReadOffset. LastConsumed ()), the new AsyncConsumeStreamListener (" consumption group A, "" group - A", "consumer - b"));Copy the code
1. Note the value of ReadOffset.
2. Note that the group needs to be created in advance.
3. Configure consumer group-automatic ACK messages
streamMessageListenerContainer.receiveAutoAck()
Copy the code
Serialization strategy
Stream Property | Serializer | Description |
---|---|---|
key | keySerializer | used for Record#getStream() |
field | hashKeySerializer | used for each map key in the payload |
value | hashValueSerializer | used for each map value in the payload |
Vi.ReadOffset
strategy
The Read Offset policy for consuming messages
Read offset | Standalone | Consumer Group |
---|---|---|
Latest | Read latest message | Read latest message |
Specific Message Id | Use last seen message as the next MessageId (Reads a message greater than the specified message ID) |
Use last seen message as the next MessageId (Reads a message greater than the specified message ID) |
Last Consumed | Use last seen message as the next MessageId (Reads a message greater than the specified message ID) |
Last consumed message as per consumer group (Reads a message that has not been assigned to a consumer group in a consumer group) |
Seven, notes
1. Timeout for reading messages
When using streamReadoptions.empty ().block(duration.ofmillis (1000)), this configuration must block for a shorter time than spring.redis.timeout. Otherwise, a timeout exception may be reported.
ObjectRecord deserialization error
If the following exceptions occur when we read messages, the troubleshooting roadmap is as follows:
java.lang.IllegalArgumentException: Value must not be null! at org.springframework.util.Assert.notNull(Assert.java:201) at org.springframework.data.redis.connection.stream.Record.of(Record.java:81) at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147) at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138) at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:164) at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594) at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413) at com.huan.study.redis.stream.consumer.xread.XreadNonBlockConsumer02.lambda$afterPropertiesSet$1(XreadNonBlockConsumer02.j ava:61) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Copy the code
RedisSerializer. String () is the best way to serialize RedisTemplate HashValueSerializer.
2, check redisTemplate. OpsForStream HashMapper () in the configuration, the default is ObjectHashMapper this is the value of the object field and serialized into byte [] format.
Provide an available configuration
# RedisTemplate hash value use of type string serialization RedisTemplate. SetHashValueSerializer (RedisSerializer. The string ()); # this method opsForStream () using the default ObjectHashMapper redisTemplate. OpsForStream ()Copy the code
As for the above mistake, I submitted an issue in the official warehouse of Spring Data Redis, and the official reply was that it was a bug and would be fixed later.
3. Use xread to read missing data sequentially
If we read data using Xread and find that some write data is missing, we need to check that the StreamOffset configured on the second read is valid. This value should be the last value of the last read.
For example:
1, SteamOffset passes $to read the latest data.
2. Process the data read in the previous step, while another producer inserts more data into the Stream, which is still processing the data read.
$Stream = Stream; $Stream = Stream; The consumer will not be able to read the data that flowed into the Stream in the previous step because it is reading the most recent data.
4,StreamMessageListenerContainer
The use of
1. Consumers can be added and removed dynamically
2. Group consumption is available
3, can be directly independent consumption
4, If you transfer ObjectRecord, need to pay attention to the serialization method. Refer to the code above.
Viii. Complete code
Gitee.com/huan1993/sp…
Ix. Reference documents
1, the docs. Spring. IO/spring – the data… 2, github.com/spring-proj…