
This article focuses on cheddar’s MessageHandlerExecutor



public class MessageHandlerExecutor extends ThreadPoolExecutor { public MessageHandlerExecutor(final String queueName, final int numThreads) { super(numThreads, numThreads, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new MessageHandlerWorkerThreadFactory(queueName)); }}Copy the code

MessageHandlerExecutor inherits ThreadPoolExecutor and uses a queue called LinkedBlockingQueue. ThreadFactory for MessageHandlerWorkerThreadFactory



public class MessageHandlerWorkerThreadFactory implements ThreadFactory { private final AtomicInteger threadSequenceNumber = new AtomicInteger(); private final String queueName; public MessageHandlerWorkerThreadFactory(final String queueName) { this.queueName = queueName; } @Override public Thread newThread(final Runnable r) { final int seq = threadSequenceNumber.incrementAndGet(); return new Thread(r, "MessageHandler:" + queueName + ":" + seq); }}Copy the code

MessageHandlerWorkerThreadFactory ThreadFactory interface is achieved, is named for its newThread method USES queueName and threadSequenceNumber thread



public class MessageHandlerWorker<T extends Message> implements Runnable { private final Logger logger = LoggerFactory.getLogger(getClass()); private final T message; private final MessageHandler<T> messageHandler; private final PooledMessageListener<T> pooledMessageListener; public MessageHandlerWorker(final PooledMessageListener<T> pooledMessageListener, final T message, final MessageHandler<T> messageHandler) { this.message = message; this.messageHandler = messageHandler; this.pooledMessageListener = pooledMessageListener; } @Override public void run() { try { messageHandler.handle(message); } catch (final Exception e) { logger.error("Error handling message: " + message, e); } finally { try { pooledMessageListener.completeMessageProcessing(message); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); }}}}Copy the code

MessageHandlerWorker implements the Runnable interface. Its run method executes the MessageHandler. handle method. The final execution pleteMessageProcessing (the message)



public abstract class PooledMessageListener<T extends Message> implements MessageListener, Runnable {

     * Maximum number of messages to receive from the queue at a time. Using larger numbers decreases the number of
     * calls to receive and thus increases throughput, at the possible expense of latency.
    protected static final int DEFAULT_MAX_RECEIVED_MESSAGES = 10;

     * Controls when messages are received from the queue by setting an ideal minimum number of runnable tasks for each
     * thread. This minimum includes the currently executing tasks and those on the thread pool work queue. When the
     * number of runnable tasks dips below the ideal, more messages are received.
    protected static final int IDEAL_RUNNABLES_PER_THREAD = 2; // Each thread has 1 executing + 1 queued runnable

     * The default number of worker threads to use in a fixed size thread pool
    protected static final int DEFAULT_NUM_WORKER_THREADS = 10;

     * Maximum duration (in seconds) to wait for messages on the queue during normal processing. If there is at least
     * one message on the queue, the actual duration will be shorter.
    private static final int LONG_POLL_DURATION_SECONDS = 20;

     * Maximum duration (in seconds) to wait for messages on the queue during handing over to a new application instance
     * in a blue-green deployment. This is shorter to enable prompt termination of this message processor.
    private static final int SHORT_POLL_DURATION_SECONDS = 2;

     * Time (in milliseconds) to pause when receive message request returns an error
    private static final long RECEIVE_MESSAGE_ERROR_PAUSE_MILLIS = 500;

     * Maximum number of attempts to delete message from queue
    private static final int MAX_DELETE_MESSAGE_ATTEMPTS = 5;

     * Time (in milliseconds) to pause when delete message request returns an error
    private static final long DELETE_MESSAGE_ERROR_PAUSE_MILLIS = 1500;

    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final MessageQueue<T> messageQueue;
    private final ThreadPoolExecutor threadPoolExecutor;
    private final RateLimiter rateLimiter;
    private final Semaphore semaphore;
    private final int maxReceivedMessages;
    private volatile boolean started;
    private volatile boolean shutdownRequested;
    private volatile boolean shutdownRequestImminent;

    public PooledMessageListener(final MessageQueue<T> messageQueue, final RateLimiter rateLimiter,
            final ThreadPoolExecutor threadPoolExecutor, final Semaphore semaphore, final int maxReceivedMessages) {
        this.messageQueue = messageQueue;
        this.rateLimiter = rateLimiter;
        this.threadPoolExecutor = threadPoolExecutor;
        this.semaphore = semaphore;
        this.maxReceivedMessages = maxReceivedMessages;

    protected abstract MessageHandler<T> getHandlerForMessage(T message);

    protected abstract void listenerStarted();

    public void start() {
        new Thread(this).start();

    public void run() {
        try {
            started = true;
            final String limiterSummary = rateLimiter != null ? ("using " + rateLimiter.toString())
                    : "not rate limited";
            logger.debug(String.format("Listener for queue [%s] has pool of %d threads and is %s", queueName(),
                    threadPoolExecutor.getMaximumPoolSize(), limiterSummary));
        } catch (final InterruptedException e) {
        } catch (final Throwable e) {
            logger.error(e.getMessage(), e);
            throw e;
        } finally {
                    "Message listener for queue [%s] has stopped receiving messages. Initiating shutdown of task executor",

    private void processMessagesUntilShutdownRequested() throws InterruptedException {
        while (!shutdownRequested) {
            // Block until there is capacity to handle up to maxReceivedMessages
            List<T> messages = Collections.emptyList();
            try {
                if (!shutdownRequested) {
                    final int pollSeconds = shutdownRequestImminent ? SHORT_POLL_DURATION_SECONDS
                            : LONG_POLL_DURATION_SECONDS;
                    try {
                        messages = messageQueue.receive(pollSeconds, maxReceivedMessages);
                    } catch (final MessageReceiveException e) {
                        logger.warn("Error receiving messages on queue:[" + queueName() + "]", e);
            } finally {
                // Release over-allocated permits
                semaphore.release(maxReceivedMessages - messages.size());
            for (final T message : messages) {
                processMessage(message); // Must complete processing each message to release permit

     * Processes a message by getting the appropriate message handler and scheduling a task to execute the handler. In
     * case of problems (e.g. the message cannot be parsed), the message processing is completed to ensure the message
     * is deleted from the queue and the associated permit is released.
     * @param message {@link Message} to process
    private void processMessage(final T message) throws InterruptedException {
        boolean workerAssigned = false;
        try {
            final MessageHandler<T> messageHandler = getHandlerForMessage(message);
            if (messageHandler != null) {
                threadPoolExecutor.execute(new MessageHandlerWorker<T>(this, message, messageHandler));
                workerAssigned = true;
        } catch (final Exception e) {
            logger.error("Unable to process received message", e);

        if (!workerAssigned) {

     * Completes message processing by deleting it from the queue and releasing the associated permit.
     * @param message {@link Message} to complete processing
    public void completeMessageProcessing(final T message) throws InterruptedException {

    private void deleteMessage(final T message) throws InterruptedException {
        for (int attempts = 0; attempts < MAX_DELETE_MESSAGE_ATTEMPTS; attempts++) {
            try {
            } catch (final MessageDeleteException e) {
                logger.warn(String.format("Failed attempt to delete message with id [%s] from queue [%s]",
                        message.getMessageId(), queueName()), e);
        logger.error(String.format("Failed all attempts to delete message with id [%s] from queue [%s]",
                message.getMessageId(), queueName()));

    private void applyRateLimiter() {
        if (rateLimiter != null) {
            try {
            } catch (final InterruptedException e) {

    protected String queueName() {
        return messageQueue.getName();

    public void prepareForShutdown() {
                "Message listener for queue [%s] is preparing for imminent shutdown. Reducing queue poll time.",
        shutdownRequestImminent = true;

    public void shutdownListener() {
        logger.debug(String.format("Message listener for queue [%s] is shutting down", queueName()));
        shutdownRequested = true;
        if (!started) {

    public boolean awaitShutdownComplete(final long timeoutMillis) {
        boolean terminated = false;
        try {
            terminated = threadPoolExecutor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
            if (terminated) {
                logger.debug(String.format("Message listener for queue [%s] shutdown has completed", queueName()));
            } else {
                        "Message listener for queue [%s] has not shutdown as message handler worker threads have not completed",
        } catch (final InterruptedException e) {
        return terminated;
Copy the code

PooledMessageListener run method tag first started to true, then execute processMessagesUntilShutdownRequested; Circulation messageQueue processMessagesUntilShutdownRequested method. The receive (pollSeconds maxReceivedMessages) pull messages; ProcessMessage is then iterated over; The processMessage method first gets the messageHandler and then creates the MessageHandlerWorker and puts it into the thread pool


Cheddar’s MessageHandlerExecutor inherits ThreadPoolExecutor and uses LinkedBlockingQueue; ThreadFactory MessageHandlerWorkerThreadFactory; MessageHandlerWorker implements the Runnable interface. Its run method executes the MessageHandler. handle method. The final execution pleteMessageProcessing (message).


  • Cheddar