1. Requirement description

1.1 Scenario Description:

As a result, the business demand of wechat terminal is increasing. The single project structure that couples the business with the third-party event processing of wechat has gradually exposed the shortcomings of insufficient carrying capacity. Therefore, the interaction with wechat needs to be separated from the business logic and managed and processed separately. This has the following advantages:

  1. Service decoupling can be achieved.
  2. You can prepare for business system microservicing.
  3. Different business systems can be optimized after decoupling.
  4. Reduce the impact of service system errors.

1.2 Technical difficulties:

Event notification is carried out in wechat by invoking the callback address configured by the customer through HTTP. There are two types of event notification:

  1. After sending the HTTP request and data, the client server replies with the SUCCESS string by default. Then, the business system will notify wechat through the specified HTTP address after business processing is completed
  2. The same request that sends the HTTP request and data requires the client server to return the result of the business process in response.

As a result, we have removed event notifications from the main business system. Therefore, the wechat event management system will release the event through MQ after receiving the wechat event notification. However, the second type of event notification requires the business processing data to be brought back to wechat in an HTTP request. In this case, the wechat event management system will block the HTTP request sent by the wechat party until the business system finishes processing the business data and returns it to the system. In this way, we need a flexible and reliable container to manage blocked wechat requests.

2. Theoretical basis

2.1Future Multi-threaded Model:

2.1.1 Model Introduction:

The Future multithreaded model is a common multithreaded blocking callback model. The specific logical structure is shown in the figure below:

The logic goes like this. When a request is sent to a future mode entry, the thread is blocked. At this point, the future thread will either call back later or start waiting. Until another thread wakes up the future thread or the future waits out. At this point, the future thread wakes up and returns the concrete results to the calling thread.

2.2 Inter-thread Communication:

2.2.1 of wait, notify, notifyAll

There are many ways to communicate between threads. The simple wait Notify NotifyAll combination is used this time. These three methods are the three methods in the Object class. The object of their control is control over the instance. Therefore, the thread needs to get the monitor for the object operation before it can control it. These three methods are commonly used to acquire an object lock using the synchronized keyword. If, no execution in the synchronized code block, Java will quote IllegalMonitorStateException anomalies at this moment. The main purpose of this is to control operation problems when the same object instance is occupied by multiple threads. Can avoid the occurrence of synchronization.

2.2.1.1 wait

The wait method is primarily used to suspend the current thread on the object instance. You can enter a timeout period. After the timeout period, the thread will wake up automatically

2.2.1.2 notify

The notify method is used to wake up threads that are asleep on the corresponding object instance, but it is important to note that this is unfair. Which thread to wake up is up to the JVM

2.2.1.3 notifyall

Notifyall, as its name implies, wakes up all pending threads on the instance object.

3. Implementation ideas:

  1. Fault tolerance: Because of the need to provide message distribution to multiple business services, message reply. The ability to handle business system timeouts is required. Therefore, the provided blocking service will have a timeout setting.
  2. Continuous service capability: We need to provide continuous and stable service. In a project, there is an overflow management for blocked requests. If it exceeds a certain maximum value, the incoming request is returned directly to the default value. Therefore, idempotent problems must be handled in service services to avoid service processing failures due to overflow. This can lead to problems with business service data or business

4. Concrete implementation:

ThreadHolder(message carrier):

import lombok.Data;

import java.util.concurrent.Callable;

/**
 * <p>
 * Description: com.javanewb.service
 * </p>
 * date:2017/10/31
 *
 * @author Dean.Hwang
 */
@Data
public abstract class ThreadHolder<T> implements Callable<T> {
    protected abstract T proData(a);//TODO normal logic processing, as well as default data return

    private T defaultData;// The default data returned
    private Object needProData;// Receive data that needs to be processed
    private Long createTime = System.currentTimeMillis();
    private Long maxWaitTime;
    private String mdc;
    private RequestHolder<T> holder;

    @Override
    public T call(a) throws Exception {
        waitThread();
        System.out.println("Thread mdc:" + mdc + " notify");
        if (needProData == null) {
            holder.removeThread(mdc, false);
            return defaultData;
        }
        return proData();
    }

    public synchronized void waitThread(a) throws InterruptedException {
        this.wait(maxWaitTime);
    }

    public synchronized void notifyThread(Object needProData) {
        this.needProData = needProData;
        this.notify();
    }

    public synchronized void notifyDefault(a) {
        this.notify(); }}Copy the code

RequestHolder:


import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * <p>
 * Description: com.javanewb.entity
 * </p>
 * date:2017/10/26
 *
 * @author Dean.Hwang
 */
public class RequestHolder<T> {
    private Integer maxSize;
    private Long waitTime;

    public RequestHolder(Integer maxSize, Long maxWait, ExecutorService executorService) {
        if (maxSize > 1000) {
            throw new BusinessException(1022."Bigger than max size num");
        }
        this.maxSize = maxSize;
        this.waitTime = maxWait;
        if(executorService ! =null) {
            this.executorService = executorService;
        } else {
            this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, newArrayBlockingQueue<>(maxSize)); }}public RequestHolder(Integer maxSize, Long maxWait) {
        if (maxSize > 1000) {
            throw new BusinessException(1022."Bigger than max size num");
        }
        this.waitTime = maxWait;
        this.maxSize = maxSize;
        this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
    }

    private ExecutorService executorService;
    private final Map<String, ThreadHolder<T>> holderMap = new ConcurrentHashMap<>();
    private List<String> mdcOrderList = new CopyOnWriteArrayList<>();
    private AtomicBoolean isCleaning = new AtomicBoolean(false);

    public ThreadHolder<T> removeThread(String mdc, boolean needNotifyDefault) {
        mdcOrderList.remove(mdc);
        ThreadHolder<T> holder;
        synchronized (holderMap) {
            holder = holderMap.get(mdc);
            holderMap.remove(mdc);
        }
        if(holder ! =null && needNotifyDefault) {
            holder.notifyDefault();
        }
        return holder;
    }

    public void notifyThread(String mdc, Object data) {
        ThreadHolder<T> holder = removeThread(mdc, false);
        if(holder ! =null) { holder.notifyThread(data); }}public Future<T> getFuture(String mdcStr, Class<? extends ThreadHolder<T>> holder) {
        if (StringUtil.isEmpty(mdcStr) || holder == null) {
            throw new BusinessException(1020."Mdc target missing!!!");
        }
        Future<T> future;
        try {
            ThreadHolder<T> thread = holder.newInstance();
            holderMap.put(mdcStr, thread);
            mdcOrderList.add(mdcStr);
            thread.setMaxWaitTime(waitTime);
            thread.setMdc(mdcStr);
            thread.setHolder(this);
            future = executorService.submit(thread);
            cleanThreadPool();
        } catch (InstantiationException | IllegalAccessException e) {
            holderMap.remove(mdcStr);
            mdcOrderList.remove(mdcStr);
            throw new BusinessException(1021."Thread Holder initialized failed");
        }
        return future;
    }

    private void cleanThreadPool(a) {
        if (mdcOrderList.size() >= maxSize && isCleaning.compareAndSet(false.true)) {

            try {
                mdcOrderList.subList(0, mdcOrderList.size() - maxSize).forEach(// See if parallel stream processing is used
                        mdc -> removeThread(mdc, true)); }finally {
                isCleaning.set(false); }}}}Copy the code

TestController(Test entry):

import com.javanewb.entity.TestThreadHolder;
import com.javanewb.thread.tools.RequestHolder;
import com.keruyun.portal.common.filter.LoggerMDCFilter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/** * <p> * Description: com.javanewb.controller * </p> * <p> * Copyright: Copyright (c) 2015 * * < / p > < / p > < p > * * date: 2017/10/25 * *@author Dean.Hwang
 */
@Api
@RestController
@Slf4j
public class TestController {
    private RequestHolder<String> holder = new RequestHolder<>(100.500000L);
    private List<String> mdcList = new ArrayList<>();

    @ApiOperation(value = "Request synchronization test", notes = "Request synchronization test")
    @RequestMapping(value = "/async", method = RequestMethod.GET)
    public void async(HttpServletRequest request, HttpServletResponse response, String id) {
        Long startTime = System.currentTimeMillis();
        String mdc = MDC.get(LoggerMDCFilter.IDENTIFIER);
        mdcList.add(mdc);
        Future<String> future = holder.getFuture(id, TestThreadHolder.class);
        log.info(Thread.currentThread().getName());
        try {
            System.out.println(mdc + " Thread Wait");
            String result = future.get();
            response.getOutputStream().print(result);
            System.out.println(" time: " + (System.currentTimeMillis() - startTime));
        } catch(IOException | ExecutionException | InterruptedException e) { e.printStackTrace(); }}@ApiOperation(value = "Release list number one", notes = "Request synchronization test")
    @RequestMapping(value = "/notify", method = RequestMethod.GET)
    public String notifyFirst(a) {
        String mdc = mdcList.get(0);
        mdcList.remove(0);
        holder.notifyThread(mdc, "");
        return mdc;
    }

    @ApiOperation(value = "Release list number one", notes = "Request synchronization test")
    @RequestMapping(value = "/notifyThis", method = RequestMethod.GET)
    public String notifyThis(String mdc) {
        int idx = 0;
        for (int i = 0; i < mdcList.size(); i++) {
            if (mdcList.get(i).equals(mdc)) {
                idx = i;
                break;
            }
        }
        mdcList.remove(idx);
        holder.notifyThread(mdc, "");
        returnmdc; }}Copy the code

5. Team:

This project will be posted on Github. If you are interested, or find a bug to deal with, you can contact me directly from the blog, or directly go to github address :github.com/crowhyc/Thr… Email address: [email protected]