The author of this article: Coke Coke, the author’s personal home page: Coke Coke personal home page

Due to the cost problem, the current limiting de-queue had to be used

My son (a new system developed by the school) actually had a financial problem when conducting a detailed demand analysis, which led to the mailbox we used may be hung up or the flow was limited. I shed tears for the NTH time because of the financial problem.

But this is also my chance to use this little wheel to catch the eye of the interviewer.

(Do you want to personally experience being blocked by QQ mailbox? It will not be very serious. After a short pause, it will recover.

import org.junit.jupiter.api.Test;
import org.kelab.aide.AideApplication;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;

import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.util.concurrent.CountDownLatch;

/ * * *@author JirathLiu
 * @date 2021/1/24
 * @description: * /
@SpringBootTest(classes = AideApplication.class)
public class SpringMailTest {
    @Autowired
    JavaMailSender javaMailSender;
    @Value("${spring.mail.username}")
    String from;
    @Test
    void contextLoads(a) {
        SimpleMailMessage simpleMailMessage = new SimpleMailMessage();
        simpleMailMessage.setFrom(from);
        simpleMailMessage.setTo("Your own QQ id @qq.com");
        simpleMailMessage.setSubject("Test");
        simpleMailMessage.setText("

Test Head?

"
); javaMailSender.send(simpleMailMessage); } String html = = "< table style \" width: 99.8%; \">test

"
; @Test void sendOneHtml(a){ MimeMessage message = javaMailSender.createMimeMessage(); try { MimeMessageHelper helper = new MimeMessageHelper(message, true); helper.setFrom(from); helper.setTo("Your own QQ id @qq.com"); helper.setText(html,true); helper.setSubject("Test"); javaMailSender.send(helper.getMimeMessage()); } catch (MessagingException e) { e.printStackTrace(); }finally { System.out.println("Finish"); }}private CountDownLatch countDownLatch = new CountDownLatch(500); @Test void sendBigNumberOfHtml(a) { for (int i = 0; i < 100; i++) { new Thread(()->{ MimeMessage message = javaMailSender.createMimeMessage(); try { MimeMessageHelper helper = new MimeMessageHelper(message, true); helper.setFrom(from); helper.setTo("Your own QQ id @qq.com"); helper.setText("

Test Head?

"
); helper.setSubject("Test"); javaMailSender.send(helper.getMimeMessage()); } catch (MessagingException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); System.out.println("Thread"+countDownLatch.getCount()); } }).start(); } try { countDownLatch.await(); System.out.println("Success"); } catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code

Ok, let’s get down to business. We need to conduct requirements analysis first, and then consider the structure of the framework to improve the scalability of the framework step by step.

Demand analysis | give some money please

The demand is very simple: the school email may be because the school’s own SAO (power, and so on strange reason) lead to hang up, for lack of public access to the system, this is a fatal blow (depends on the stuff sent the message), change the QQ mailbox, netease mail, and will not be distinguished VIP customers due to the transmitting frequency constraints

So there are basically two directions to solve the problem:

  1. Give me some money
  2. Since one is not safe, open a backup, since one can CD, slowly let him send

All in all, it’s impossible to make money (how could such a great opportunity go to waste

So we have to be forced to study a little wheel

Relevant technical considerations

So with this analysis, I am troubled by two questions:

  1. demotion
  2. Current limiting

These two concepts have been encountered in microservices and are important.

Downgrading: When a service becomes unavailable, we should not continue to stress it. Instead, we should simply put a service back down or use an alternative, whatever logic, and wait some time before testing the original service.

Limiting traffic: a service can easily explode in response to a sudden increase in traffic, so a limiting operation is used to allow the target to hit the service at a constant rate.

For this service, there is a special problem. An email may contain some critical information that needs to be guaranteed not to be lost. That is, messages need to be transmitted reliably.

Possible solutions

  1. MQ solution, limiting flow is very flexible
  2. Try using Java concurrency tools

Ah, message queues are perfect for this, consumer producer mode, speed consumer control, Juice

But are we the kind of people who go through the motions? We’re not, we’re going to try to challenge ourselves, and I’m going to do one manually! (really owe to play

All right, let’s play, let’s play, let’s not joke about MQ (it’s really the most efficient option

So how do you achieve a downgrade?

We can flag an email account, flag it every time an exception occurs, and demote it when a certain number of failures occur

There is a question of how to recover.

Simply, we record the time when the last error occurred. If we exceed our preset time, we reset it. Otherwise, we add one

How to achieve traffic limiting?

At present, there are mainly two ways to limit the current (without the help of a third party).

  1. Limit the amount of access to a resource (think semaphore)
  2. Access frequency of target is obtained by computing resources, through to limit the results (in the Sentinel current-limiting QPS is the principle of strategy use, recommend an article: blog.csdn.net/qq924862077…

In practice, using QPS can control flow more evenly in most scenarios (thanks to the interviewer for asking this question)

However, we need to seriously consider the algorithm of QPS. Considering the current business (sending emails and sending by one thread will not cause problems after testing),

We can limit the number of threads directly, and use thread pools to achieve our expectations

Build our traffic limiting degradation framework

In order to ensure that our code is elegant, we need to understand some design patterns to ensure the extensibility of the framework, such as: template method design patterns.

To use thread pools, consider using Java’s built-in thread pool ExecutorService.

To make the most of the Spring Cloud architecture, we also need to consider dynamically modifying configurations. We need to use the ExecutorService subclass ThreadPoolExecutor, which provides interfaces to dynamically change configurations.

There are generally two ways to create a thread pool

ExecutorService executorService = Executors.newFixedThreadPool(5);
ThreadPoolExecutor threadPool= new ThreadPoolExecutor(4.5.5L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                new MailThreadFactory(),
                this::doThreadOverflow);
Copy the code

For these two methods, we use the second custom mode, which can better control the thread pool, set their own thread name, overflow policy and so on.

ThreadPoolExecutor threadPool= new ThreadPoolExecutor(1.2.5L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                new MailThreadFactory(),
                this::doThreadOverflow);
/** * The production thread uses a reentrant lock to ensure that the thread has a unique increment id * optimizable point: the number of threads should not be too high, otherwise id duplication will occur (a small chance */)
private static class MailThreadFactory implements ThreadFactory{
    private volatile int index=0;
    private Lock lock = new ReentrantLock(true);

    @Override
    public Thread newThread(Runnable r) {
        lock.lock();
        try {
            Thread thread = new Thread(r);
            thread.setName("MAIL_THREAD-"+index);
            index++;
            return thread;
        }finally{ lock.unlock(); }}}/** * Thread pool overprocessing * discards this task by default, issues an error * The system normally does not overflow, there will be a policy to control the number *@param r
     * @param pool
     */
  protected void doThreadOverflow(Runnable r,ThreadPoolExecutor pool) {
        LOGGER.warn("Mail ThreadPool overflow, this shouldn't be happen normally.");
        LOGGER.warn("pool status: \bcore:{}\nmax:{}\nsize:{}",pool.getCorePoolSize(),pool.getMaximumPoolSize(),pool.getPoolSize());
    }
Copy the code

To abstract the Mail task, Mail account, we need to build two classes Mail, and MailNode

To increase the extensibility of the Mail class, we added two subclasses: HtmlMail and SimpleMail, which are used to distinguish between task types (it doesn’t matter, it’s about sending Mail, not about this framework)

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Mail {
    protected List<String> addresses;
    protected String subject;
    protected String content;
    protected String host ;
    protected String username ;
    protected String password ;
    protected String sender ;

    public boolean isLegal(a) {
        if (addresses==null||addresses.isEmpty()) {
            return false;
        }
        for (String s : addresses) {
            if (! MailCheckUtil.isMail(s)) {
                return false; }}return! StringBaseUtil.isStringListNull(subject,content,sender); }public boolean canSend(a){
        return isLegal()
                && !StringBaseUtil.isStringListNull(host,username,password);
    }
}


/ * * *@author JirathLiu
 * @date 2021/1/31
 * @description: * /
public class HtmlMail extends Mail{}/ * * *@author JirathLiu
 * @date 2021/1/31
 * @description: * /
public class SimpleMail extends Mail{}Copy the code

We abstract the mailbox resources, because the underlying plan is to use the Apache Mail module, so we need host, username, password, etc. To mark the mailbox state, we set the error times, and flag bit isShutdown(easy to query, AtomicInteger is a little slower.)

It then stores the time of the upload error and provides a lock to lock the node

Because of the design pattern standards, we need to implement two methods here to record the number of errors and so on, providing an interface for other class modifications

/** * Stores all mailbox nodes */
private class MailNode {
    String host ;
    String username ;
    String password ;
    // The number of failures
    AtomicInteger errTimes=new AtomicInteger(0);
    // Whether to close
    volatile boolean isShutdown=false;
    // Store the last lock time. If it passes, the lock will be cleared. If it fails, the lock will be extended
    AtomicLong lastErrTime=new AtomicLong(System.currentTimeMillis());
    Lock lock = new ReentrantLock(true);

    public MailNode(String host, String username, String password) {
        this.host = host;
        this.username = username;
        this.password = password;
    }

    public boolean isLocked(a) {
        if (isShutdown) {
            long time=lastErrTime.get()+mailHostWaitTime.get();
            if (time < System.currentTimeMillis()) {
                isShutdown=false;
                errTimes.set(0);
                return false;
            }else {
                return true; }}return false;
    }

    /** * record an error, if the time since the last error reached the required, reset to 1, otherwise add one */
    public void errRecord(a) {
        long time=lastErrTime.get()+mailHostWaitTime.get();
        if (time < System.currentTimeMillis()) {
            errTimes.set(1);
        }else {
            errTimes.addAndGet(1);
        }
        lastErrTime.set(System.currentTimeMillis());
        if (errTimes.get() >= maxErrTime) {
            isShutdown=true; }}public String getHost(a) {
        return host;
    }

    public String getUsername(a) {
        return username;
    }

    public String getPassword(a) {
        returnpassword; }}Copy the code

Then, in order to ensure the performance of the framework, there is no limit on the waiting queue, but the alarm is allowed when the length exceeds a certain length. We use the template method to design the mode to achieve the purpose

/** * Submit a task * If the free position reaches the threshold, raise an alert * lambda expression, internally super.send(org.kelab.framework.mail) *@param mail
 * @throws MailException
 */
private void submitSend(Mail mail) throws MailException {
    int size = getQueueSize();
    if (size < maxTaskNum.get()) {
        doWarn(threadPool,mails);
    }
    threadPool.submit(() -> doSend(mail));
}

/** * Warning processing *@param threadPool
 */
protected void doWarn(ThreadPoolExecutor threadPool,List<MailNode> mails) {}Copy the code

In order to monitor the mailbox state, we need to customize the thread pool tasks ourselves

/** ** If an exception occurs during system access, * records the number of errors and the last error time, and resends the task. * * Find the appropriate mailbox, traverse the number group, if available, then use, otherwise skip, if not available, then execute the alarm, and cache task, add a listener to wait for recovery. * * Each time a new error is recorded, * if the last error time is outside the specified time, it will be recorded again * otherwise add a record * * After successful execution, check whether the record needs to be obtained from Redis *@param mail
 * @throws MailException
 */
private void doSend(Mail mail) throws MailException {
    // Walk through the mailbox and select the first available mailbox to try
    for (MailNode mailNode : mails) {
        if(! mailNode.isLocked()) { copyAttrToMail(mailNode,mail);if (!super.send(mail)){
                mailNode.errRecord();
            }else{
                return; }}}// If no, check whether the pause recovery mode is enabled and cache records
    failedOperation(mail);
}
/** * Failed operation *@param mail
     */
    private void failedOperation(Mail mail){
        submitSend(mail);
    }
Copy the code

Add SpringCloud dynamic configuration refresh to the framework

To take advantage of Spring Cloud of dynamic configuration refresh, we implement two interfaces ApplicationListener < RefreshScopeRefreshedEvent >, InitializingBean

Every time the configuration is refreshed, a refresh event is published, and existing beans are eliminated and refactored (so the configuration must not be in this class)

/** * Check if it is necessary to rebuild the thread pool * if it is necessary to update the mailbox situation */
@Override
public void onApplicationEvent(RefreshScopeRefreshedEvent refreshScopeRefreshedEvent) {
    Boolean isEnable=ojMailSenderProperties.getEnableStable();
    if(isEnable ! =null&& isStableOpen ! = isEnable.booleanValue()) { isStableOpen = isEnable; }// Modify the mailbox Settings. If the new Settings are empty, no update will be made and a warning will be issued
    List<MailAccount> accounts = ojMailSenderProperties.getMailList();
    if (accounts==null || accounts.isEmpty()) {
        LOGGER.warn("Mail Account is null, settings keeping.");
    }else {
        List<MailNode> mails=new ArrayList<>();
        for (MailAccount account : accounts) {
            mails.add(new MailNode(account.getHost(), account.getUsername(), account.getPassword()));
        }
        rebuildMails(mails);
    }
    // Mask expiration Settings
    Duration recheckTime=ojMailSenderProperties.getRecheckTime();
    if(recheckTime ! =null && !recheckTime.isNegative()) {
        if (recheckTime.getSeconds()*1000! =this.mailHostWaitTime.get()) {
            this.mailHostWaitTime.set(recheckTime.getSeconds()*1000); }}// Alarm Settings
    Integer newMaxTaskNum = ojMailSenderProperties.getMaxTaskNum();
    if(newMaxTaskNum ! =null&& maxTaskNum.get()! =newMaxTaskNum.intValue()) { maxTaskNum.set(newMaxTaskNum); }// Check the thread pool size
    Integer newThreadNum=ojMailSenderProperties.getTaskRunNum();
    if(newMaxTaskNum ! =null&& newThreadNum ! = threadPool.getCorePoolSize()) { threadPool.setCorePoolSize(newThreadNum); threadPool.setMaximumPoolSize(newMaxTaskNum*2);
    }
    // Thread lifetime
    Duration newKeepAliveTime=ojMailSenderProperties.getThreadKeepLiveTime();
    if(newKeepAliveTime! =null &&
            newKeepAliveTime.getSeconds()!=threadPool.getKeepAliveTime(TimeUnit.SECONDS)) {
        threadPool.setKeepAliveTime(newKeepAliveTime.getSeconds(),TimeUnit.SECONDS);
    }
}

private void rebuildMails(List<MailNode> newMails) {}/** * After the bean property is set successfully, the system needs to be preset * mailbox, thread pool */
@Override
public void afterPropertiesSet(a){
    if(ojMailSenderProperties.getEnableStable() ! =null) {
        this.isStableOpen=ojMailSenderProperties.getEnableStable();
    }
    // Initialize the mailbox
    List<MailAccount> accounts = ojMailSenderProperties.getMailList();
    if (accounts==null || accounts.isEmpty()) {
        throw new IllegalArgumentException("Mail accounts can't be empty.");
    }
    mails=new ArrayList<>();
    for (MailAccount account : accounts) {
        mails.add(new MailNode(account.getHost(), account.getUsername(), account.getPassword()));
    }

    //set thread pool
    Integer taskRunNum=ojMailSenderProperties.getTaskRunNum();
    taskRunNum = taskRunNum >1 ? taskRunNum: 1;
    Integer taskRunNum1 = ojMailSenderProperties.getTaskRunNum();
    taskRunNum1 = taskRunNum1 > 5 ? taskRunNum1 : 5;
    Duration threadKeepLiveTime=ojMailSenderProperties.getThreadKeepLiveTime();
    if (threadKeepLiveTime.isNegative()){
        threadKeepLiveTime= Duration.ZERO;
    }

    long keepAliveTime = threadKeepLiveTime.getSeconds();
    TimeUnit timeUnit = TimeUnit.SECONDS;

    Duration recheckTime=ojMailSenderProperties.getRecheckTime();
    long recheckTimeMill = recheckTime.getSeconds() * 1000;

    maxTaskNum = new AtomicInteger(taskRunNum1);
    threadPool.setCorePoolSize(taskRunNum);
    threadPool.setMaximumPoolSize(taskRunNum1 *2);
    threadPool.setKeepAliveTime(keepAliveTime, timeUnit);
    mailHostWaitTime.set(recheckTimeMill);

}
Copy the code

Construction completed, wuhu take-off ~~

The code is in Github, welcome to bring partners to visit, need your small star~

Github repository address

All right, here we go. Please, one button, three

This article is participating in the “Nuggets 2021 Spring Recruitment Campaign”, click to see the details of the campaign