The original address: www.xilidou.com/2018/01/22/…
In high concurrency systems, we often encounter such requirements: the system generates a large number of requests, but these requests are not high real-time requirements. We can combine those requests, and when we reach a certain number we submit them all. Maximize the utilization of system IO to improve system throughput performance.
So the request merge framework needs to consider the following two requirements:
- Submit data when a certain number of requests have been collected
- After a certain period of time if the number of requests did not reach the specified number also commit
Let’s talk about how to implement such a requirement.
Read this article and you will learn:
- ScheduledThreadPoolExecutor
- Blocking queue
- Thread-safe parameters
- The use of LockSuppor
Design ideas and implementation
So let’s talk a little bit about what’s the idea behind this. I hope you can learn to analyze problems and design some routines of modules.
- What underlying data structure is used to hold the requests that need to be merged?
- Since our system is used in a high concurrency environment, we certainly can’t use ordinary
ArrayList
To hold. We can use blocking queues to hold requests that need to be merged. - Our data structure needs to provide an add() method externally for submitting data. After the external add data, we need to check whether the number of data in the queue has reached our limit. Quantity reached submit data, not reached continue to wait.
- The data structure also needs to provide a timeOut() method that is periodically called by an external timer and, if called, directly submits data to the remote.
- When the condition is met, the thread performs the submission action; when the condition is not met, the thread should pause and wait for the queue to reach the condition of submitting data. So we can consider using
LockSuppor.park()
andLockSuppor.unpark
To suspend and activate the action thread.
- Since our system is used in a high concurrency environment, we certainly can’t use ordinary
After the above analysis, we have this data structure:
private static class FlushThread<Item> implements Runnable{
private final String name;
// Queue size
private final int bufferSize;
// Operation interval
private int flushInterval;
// The time of the last submission.
private volatile long lastFlushTime;
private volatile Thread writer;
// A blocking queue holding data
private final BlockingQueue<Item> queue;
// How to implement the conditions after they are reached
private final Processor<Item> processor;
// constructor
public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) {
this.name = name;
this.bufferSize = bufferSize;
this.flushInterval = flushInterval;
this.lastFlushTime = System.currentTimeMillis();
this.processor = processor;
this.queue = new ArrayBlockingQueue<>(queueSize);
}
// The method of submitting data externally
public boolean add(Item item){
boolean result = queue.offer(item);
flushOnDemand();
return result;
}
// The timeout method provided to the external
public void timeOut(a){
// More than two commits exceed the time interval
if(System.currentTimeMillis() - lastFlushTime >= flushInterval){ start(); }}// Unblock the thread
private void start(a){
LockSupport.unpark(writer);
}
// Whether the current data is greater than the submitted condition
private void flushOnDemand(a){
if(queue.size() >= bufferSize){ start(); }}// Execute the method to submit data
public void flush(a){
lastFlushTime = System.currentTimeMillis();
List<Item> temp = new ArrayList<>(bufferSize);
int size = queue.drainTo(temp,bufferSize);
if(size > 0) {try {
processor.process(temp);
}catch (Throwable e){
log.error("process error",e); }}}// Decide whether to commit according to the size and time interval of the data
private boolean canFlush(a){
return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
}
@Override
public void run(a) {
writer = Thread.currentThread();
writer.setName(name);
while(! writer.isInterrupted()){while(! canFlush()){// If the thread is not interrupted and the condition for execution is not met, the thread is blocked
LockSupport.park(this); } flush(); }}}Copy the code
- How do you implement timed commits?
We usually meet timing relevant requirements, the first thought should be use FlushThread ScheduledThreadPoolExecutor timing to call a timeOut method, if you think of the Thread. The sleep ()… That needs to study hard again, read the source code.
- How to further improve system throughput?
The FlushThread we use implements Runnable, so we can consider using a thread pool to hold multiple Flushthreads.
So we have code like this:
public class Flusher<Item> {
private final FlushThread<Item>[] flushThreads;
private AtomicInteger index;
// Prevent multiple threads from executing simultaneously. Add a random number interval
private static final Random r = new Random();
private static final int delta = 50;
private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);
private static ExecutorService POOL = Executors.newCachedThreadPool();
public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {
this.flushThreads = new FlushThread[threads];
if(threads > 1){
index = new AtomicInteger();
}
for (int i = 0; i < threads; i++) {
final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);
flushThreads[i] = flushThread;
POOL.submit(flushThread);
// Call the timeOut() method periodically.TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS); }}// mod index to ensure that all threads can be added
public boolean add(Item item){
int len = flushThreads.length;
if(len == 1) {return flushThreads[0].add(item);
}
int mod = index.incrementAndGet() % len;
return flushThreads[mod].add(item);
}
// As described above
private static class FlushThread<Item> implements Runnable{
...省略
}
}
Copy the code
- Interface oriented programming to improve system expansibility:
public interface Processor<T> {
void process(List<T> list);
}
Copy the code
use
Let’s write a test method to test:
// The Processor outputs the entire String
public class PrintOutProcessor implements Processor<String>{
@Override
public void process(List<String> list) {
System.out.println("start flush");
list.forEach(System.out::println);
System.out.println("end flush"); }}Copy the code
public class Test {
public static void main(String[] args) throws InterruptedException {
Flusher<String> stringFlusher = new Flusher<>("test".5.1000.30.1.new PrintOutProcessor());
int index = 1;
while (true){
stringFlusher.add(String.valueOf(index++));
Thread.sleep(1000); }}}Copy the code
Results of execution:
start flush
1
2
3
end flush
start flush
4
5
6
7
end flush
Copy the code
We found that it did not take five digits to trigger flush. Flush was executed even though the required five numbers were not reached because a timeout commit was triggered.
If we remove Thread.sleep(1000); And look at the results:
start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush
Copy the code
Every five numbers are submitted. Perfect…
conclusion
A more vivid example to explain some of the specific use of multithreading. Learning multithreading should be more thinking more hands-on, will have a better effect. I hope that after reading this article, you will have a harvest. Welcome to exchange.
Making address: github.com/diaozxin007…
Free hand lift frame series article address:
Hands-free framework – Implementing IoC
Hands-free framework – Implementing Aop