Starter that provides message queuing capabilities to Spring-Boot and provides a lightweight implementation of VM threads. Project address: github.com/wangyuheng/…

What is a message queue

Message queues are containers for messages that consumers can retrieve for consumption.

Observer model

The Observer pattern is defined as a one-to-many dependency between multiple objects. When an object’s state changes, all dependent objects are notified and automatically updated. This pattern, sometimes referred to as publish-subscribe or model-view, is an object behavior pattern.

An Observer originally means an Observer, but the implementation does not actively observe, but passively receives notifications from the Subject, so the more appropriate name would be “message delivery”. The downside of the notification pattern is that the notification and the multiple ConcreteObserver consumers are still in a synchronous thread, so it is only decouple at the level of the code structure, and still in a transaction at the bottom. To solve this problem, the sending of messages and the N consumers were split into N+1 transactions, so message queues were introduced to store subjects.

Domain model design

  • List domain concepts

  • Sorting out interaction relationships

Code implementation

  1. LinkedBlockingQueueAs a container for storing messages.
  2. Store is used to Store messages. In order to be compatible with multipleConsumer, eachConsumerSpecify a unique identifier asPartition KeyCorresponds to a unique oneLinkedBlockingQueue. e.g.Map<Partition, LinkedBlockingQueue<Message>> messageQueueMap = new ConcurrentHashMap<>();
  3. ProducerthroughTransportSend only multiple messagesPartition KeytheLinkedBlockingQueueIn the queue
  4. eachConsumerStart a thread polling fromLinkedBlockingQueueConsume messages in the queue.

Code snippet

  • VmStore
    private Map<Partition, LinkedBlockingQueue<Message>> messageQueueMap = new ConcurrentHashMap<>();

    @Override
    public void append(Message message, Partition partition) {
        initQueue(partition);
        messageQueueMap.get(partition).add(message);
    }

    @Override
    public LinkedBlockingQueue<Message> findByPartition(Partition partition) {
        initQueue(partition);
        return messageQueueMap.get(partition);
    }

    private void initQueue(Partition partition) {
        if(! messageQueueMap.containsKey(partition)) {synchronized (this) {
                if(! messageQueueMap.containsKey(partition)) { messageQueueMap.put(partition,newLinkedBlockingQueue<>()); }}}}Copy the code
  • Transport
public void transfer(Message message) {
    final String topic = message.getTopic();
    topicClientIdMap.get(topic).forEach(clientId -> {
        Partition partition = new Partition(clientId, topic);
        store.append(message, partition);
    });
}
Copy the code
  • ConsumerCluster
/** * Can only be started once */
public synchronized void start(Store store) {
    if(! initialized.get()) {synchronized (this) {
            SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
            taskExecutor.setDaemon(true);
            taskExecutor.execute(new ConsumerListener(this.getMessageHandler(), store.findByPartition(this.generatePartition())));
            initialized.set(true); }}}/** * closes the consuming thread */
public void shutdown(a) {
    liveToggle.set(false);
}

/** * suspend consumption */
public void pause(a) {
    runToggle.set(false);
}

/** * Restart the paused consuming thread */
public void restart(a) {
    runToggle.set(true);
}

class ConsumerListener implements Runnable {

    private MessageHandler handler;
    private LinkedBlockingQueue<Message> queue;

    ConsumerListener(MessageHandler handler, LinkedBlockingQueue<Message> queue) {
        this.handler = handler;
        this.queue = queue;
    }

    @Override
    public void run(a) {
        while (true) {
            try {
                if(! liveToggle.get()) {break;
                }
                if (runToggle.get()) {
                    Message message = queue.poll();
                    if (null == message) {
                        Thread.sleep(100);
                    } else{ handler.handle(message); }}else {
                    Thread.sleep(100); }}catch(InterruptedException e) { Thread.currentThread().interrupt(); }}}}Copy the code

The reason for using LinkedBlockingQueue instead of the take method is to have more control over the start and stop of consuming threads.

Spring integration

For ease of use, integrate with the Spring framework in the form of annotations.

The sample

  • Consumer
@Consumer(topic = CONSUMER_TOPIC, id = CUSTOM_CONSUMER_ID)
public void consumerMessage(Message message) {
    consumerRecordMap.get(CUSTOM_CONSUMER_ID).add(message);
}
Copy the code
  • Producer
@Autowired
private DefaultProducer<String> producer;

public void sendMessage(a){
    producer.send(new Message<>(CUSTOM_TOPIC, "This is a message!"));
}
Copy the code

Code implementation

/** * Register consumer bean **@see Consumer
 * @see MessageHandler
 * @see Store
 */
public class ConsumerBeanDefinitionRegistryPostProcessor implements BeanPostProcessor.ApplicationContextAware {

    private ConfigurableApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<? > targetClass = AopProxyUtils.ultimateTargetClass(bean); Method[] methods = ReflectionUtils.getAllDeclaredMethods(targetClass);for (Method method : methods) {
            if (AnnotatedElementUtils.hasAnnotation(method, Consumer.class)) {
                final String topic = method.getAnnotation(Consumer.class).topic();
                final String id = StringUtils.isEmpty(method.getAnnotation(Consumer.class).id()) ? beanName + method.getName() : method.getAnnotation(Consumer.class).id();
                final BeanFactory beanFactory = applicationContext.getBeanFactory();
                final Store store = beanFactory.getBean(Store.class);

                final MessageHandler messageHandler = message -> ReflectionUtils.invokeMethod(method, bean, message);

                final BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(ConsumerCluster.class, () -> {
                    ConsumerCluster consumerCluster = new ConsumerCluster();
                    consumerCluster.setId(id);
                    consumerCluster.setTopic(topic);
                    consumerCluster.setMessageHandler(messageHandler);
                    consumerCluster.start(store);
                    return consumerCluster;
                });
                BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();
                ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(beanName + method.getName() + "Listener", beanDefinition); }}returnbean; }}Copy the code

other

  1. How to consume across applications? Store and Transport are replaced by public storage such as Mysql and Redis. Mysql needs to consider row locking.