instructions

Rocketmq-spring consumer properties can be configured in two ways:

  1. Perform the configuration in the configuration file
  2. Configure the attributes on the class using the @RocketmqMessagelistener annotation

For attributes in annotations, see: Org. Apache. Rocketmq. Spring. The annotation. RocketMQMessageListener, and only the following can be configured in the file attributes (does not comply with the spring boot automatically configure the specification, so there will not be prompt in idea)

The instructions are as follows:

If you want to set some initialization parameters such as the maximum number of retries, it is obviously not supported.

At the same time, if you look at the source code for constructing consumer, you can see that when constructing a consumer instance, only a few fixed properties are configured:

    private void initRocketMQPushConsumer(a) throws MQClientException {
 
        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
            this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
        boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
        if (Objects.nonNull(rpcHook)) {
            consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                enableMsgTrace, this.applicationContext.getEnvironment().
                resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            consumer.setVipChannelEnabled(false);
            consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
        } else {
            log.debug("Access-key or secret-key not configure in " + this + ".");
            consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                this.applicationContext.getEnvironment().
                    resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
        }
 
        String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
        if(customizedNameServer ! =null) {
            consumer.setNamesrvAddr(customizedNameServer);
        } else {
            consumer.setNamesrvAddr(nameServer);
        }
        if(accessChannel ! =null) {
            consumer.setAccessChannel(accessChannel);
        }
        consumer.setConsumeThreadMax(consumeThreadMax);
        if (consumeThreadMax < consumer.getConsumeThreadMin()) {
            consumer.setConsumeThreadMin(consumeThreadMax);
        }
        consumer.setConsumeTimeout(consumeTimeout);
 
        switch (messageModel) {
            case BROADCASTING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
                break;
            case CLUSTERING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
                break;
            default:
                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
        }
 
        switch (selectorType) {
            case TAG:
                consumer.subscribe(topic, selectorExpression);
                break;
            case SQL92:
                consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
                break;
            default:
                throw new IllegalArgumentException("Property 'selectorType' was wrong.");
        }
 
        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }
 
        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
        } else if (rocketMQReplyListener instanceofRocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer); }}Copy the code

But on the last few lines of code, rocketMQListener if implemented RocketMQPushConsumerLifecycleListener interface, Will call RocketMQPushConsumerLifecycleListener prepareStart (consumer) method, obviously, consuemr parameters can be set here.

RocketMQListener is a RocketMQMessageListener bean on the class.

The solution

    @RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_topic_consumer", selectorExpression = "*")
    class StringConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
 
        @Override
        public void onMessage(String message) {
            LOGGER.info("receive message: {}", message);
        }
 
        @Override public void prepareStart(DefaultMQPushConsumer consumer) {
            // Set the maximum number of retries
            consumer.setMaxReconsumeTimes(5);
            // Set other consumer-related properties as follows
            consumer.setPullBatchSize(16); }}Copy the code

At the end of the language

Realized that this solution is in the source code, I think now that provide the interface for custom configuration, the official document should have the sample, and then turned over the dead simple, there is some way of using, there are other examples on the source, if you have other questions, Suggestions or whether the official sample to provide the related solutions. Github address: github.com/apache/rock…