This is the fifth day of my participation in the August Wen Challenge.More challenges in August
The JDK provides four synchronization utility classes
The synchronization utility class can be any object.
Blocking queues can be used as synchronization utility classes.
The producer field stores the task to the queue, and the consumer thread retrieves the task from the blocking queue. Producer and consumer threads can be decoupled to some extent. When the rate of saving and retrieving tasks is inconsistent, the efficiency does not decrease.
Other synchronization tool classes include Semaphore, Barrier, CyclicBarrier, FutureTask, and latches, CountDownLatch.
1.1 Semaphore
Semaphore can be used to control the number of operations that access certain resources, can be used as a stream limiter, and can be used to implement resource pooling.
Semaphore manages a virtual set of permits, which can be specified through the Semaphore constructor.
Semaphore semp = new Semaphore(5);
Copy the code
Permission is obtained when the operation is performed and released after use. If no permission is available, block until one is available. Acquire () can acquire a license, and release() can release a license.
Semaphore can be used as a database connection pool. You can construct a fixed-length resource pool, and when the pool is empty, resource requests fail.
Semaphore can also be used to implement bounded blocking containers. Take the following code for example. When adding an element, a permission is obtained. If there is no permission, the element is blocked until there is permission. When an element is deleted, a license is released.
public class Test0514BoundedHashSet<T> { private final Set<T> set; private final Semaphore sem; public Test0514BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<T>()); sem = new Semaphore(bound); } public Boolean add(T o) throws InterruptedException {sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (! wasAdded) { sem.release(); } } } public boolean remove(Object o) { boolean wasRemoved = set.remove(o); If (wasRemoved) {// Remove the element, then release a permission sem.release(); } return wasRemoved; }}Copy the code
1.2 the Barrier fence
The fence waits for all threads to reach the fence before continuing.
For example, when making tea, it is necessary to wait for washing the teapot, washing the teacup and taking the tea leaves before performing the tea-making action. (You can also use CompletableFuture implementation)
In another example, several families decide to meet at a certain place: “Everyone meets at McDonald’s at 6:00, waits for someone else when they arrive, and then talks”.
Cyclicbarriers allow a certain number of participants to repeatedly converge at the fence position, which is very useful in parallel iterative algorithms. Can refer to: Wang Baoling teacher “Java concurrent Programming combat” “19 CountDownLatch and CyclicBarrier: How to make multithreading pace?” An example of an accounting system.
CyclicBarrier, on the other hand, is a group of threads waiting for each other, more like a bunch of traveling buddies. In addition, the CountDownLatch’s counter is not recycled, meaning that once the counter drops to zero and another thread calls await(), the thread will pass directly. But CyclicBarrier counters are recyclable and automatically reset to your original value once the counter drops to zero. CyclicBarrier also has the ability to set callback functions, which is quite feature-rich.
The await method is called when the thread reaches the fence position and blocks until all threads have reached the fence position. If all threads reach the fence position, the fence opens, all threads are released, and the fence is reset (the number of parties resets to its initial value) for next use.
/** * Execute CyclicBarrier task * test CyclicBarrier task * execute CyclicBarrier task * execute CyclicBarrier task * execute CyclicBarrier task * <p> * The optimization point here is: when executing check, query the next two time-consuming tasks: Query the waybill repository, query order cry * * / public class TestCyclicBarrier {/ / execute callback thread pool Executor Executor = Executors. NewFixedThreadPool (1); final CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(() -> check()); }); public static void main(String[] args) { TestCyclicBarrier barrier = new TestCyclicBarrier(); barrier.checkAll(); } void check() { System.out.println(">>>> check"); } void checkAll() {threadt1 = new Thread(() -> {while (true) {sleep(1000); System.out.println(" query order cry "); / / wait for the barrier. Await (); System.out.println(">> count = " + barrier.getNumberWaiting()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); }}}); T1.start(); Threadt2 = new Thread(() -> {while (true) {try {system.out.println (" "); Sleep (2000); / / wait for the barrier. Await (); System.out.println(">> count = " + barrier.getNumberWaiting()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); }}}); T2.start(); }}Copy the code
1.3 FutureTask
The calculation represented by FutureTask is implemented through a Callable, which acts as a Runnable that generates results. When FutureTask enters the finished state, it stays there forever.
FutureTask is often used for long computations where the results are used at a later time to reduce the time required to wait for the results.
/** * Use FutureTask to preload data that is needed later * <p> * Preload data * Future.get () exception handling needs attention ** / public class Test0512Preloader {private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>() { @Override public ProductInfo call() throws DataLoadException { return loadProductInfo(); }}); private final Thread thread = new Thread(future); public void start() { thread.start(); } public ProductInfo get() throws DataLoadException, InterruptedException { try { return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) { throw (DataLoadException) cause; } else { throw launderThrowable(cause); ** @param t * @return */ private RuntimeException launderThrowable(Throwable t) {if (t instanceof) RuntimeException) { return (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } else { throw new IllegalStateException("Not unchecked", t); } } public ProductInfo loadProductInfo() { return new ProductInfo(); }}Copy the code
1.4 Latch closure
A latch is a door that remains closed and no thread can pass through until the latch reaches the end state, when the door opens to allow all threads to pass through.
Similarly, you can refer to “19 CountDownLatch and CyclicBarrier: How to keep multithreading in step?” in “Java Concurrent Programming Combat” by Wang Baoling. An example of an accounting system.
Another example:
public class TestCountDownLatch { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(2); New Thread() {public void run() {try {system.out.println (" child "+ thread.currentThread ().getName() +" running "); Thread.sleep(3000); System.out.println(" child Thread "+ thread.currentThread ().getName() +" done "); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); }}; }.start(); New Thread() {public void run() {try {system.out.println (" child "+ thread.currentThread ().getName() +" running "); Thread.sleep(3000); System.out.println(" child Thread "+ thread.currentThread ().getName() +" done "); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); }}; }.start(); Try {system.out. println(" wait for 2 child threads to complete...") ); latch.await(); System.out.println("2 child threads have finished executing "); System.out.println(" Continue executing the main thread "); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code
The output
Child Thread thread-0 is executing and waiting for two child threads to finish executing... Child Thread Thread-1 Executing child Thread Thread-0 executing complete Child Thread Thread-1 executing complete Two child threads executing the main ThreadCopy the code
A fence is used to wait for other threads, and a lock is used to wait for events.
How to gracefully stop a thread
2.1 Method 1: Use the interrupt flag
Use the Cancelled flag to stop the thread when the status is true.
Example:
public class Test0701CancelThread implements Runnable { public static void main(String[] args) { Test0701CancelThread cancelThread = new Test0701CancelThread(); try { cancelThread.aSecondOfPrimes(); } catch (InterruptedException e) { e.printStackTrace(); } } private final List<BigInteger> primes = new ArrayList<>(); private volatile boolean cancelled; @override public void run() {system.out.println (" thread "); BigInteger p = BigInteger.ONE; while (! cancelled) { p = p.nextProbablePrime(); synchronized (this) { primes.add(p); }}} public void cancel() {system.out.println (" cancel thread "); cancelled = true; } public synchronized List<BigInteger> get() { return new ArrayList<>(primes); } /** * Test method * Prime generator stops after 1s ** @return * @throws InterruptedException */ public List<BigInteger> aSecondOfPrimes() throws InterruptedException { Test0701CancelThread cancelThread = new Test0701CancelThread(); new Thread(cancelThread).start(); try { SECONDS.sleep(1); } finally { cancelThread.cancel(); } return cancelThread.get(); }}Copy the code
Question:
1. If the blocking method is called, the cancellation flag may never be checked and the thread cannot be terminated.
2.2 Method two: Interrupt () via Thread.
Interrupts a Thread through the interrupt() method of the Thread class. This is a best practice for stopping a thread. It does not stop the thread immediately, but preserves what is necessary, and then safely stops the thread.
Example:
public class Test0705CancelThread extends Thread {
private final BlockingQueue<BigInteger> queue;
public Test0705CancelThread(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted()) {
queue.put(p = p.nextProbablePrime());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void cancel() {
interrupt();
}
}
Copy the code
Note:
- Calling interrupt does not immediately stop what the target thread is doing, but merely passes a message requesting an interruption.
- If calling the interrupted() method returns true, the interrupt can be restored by calling interrupted() again.
- Often, interrupts are the most logical way to implement cancellation.
2.3 Method 3: Cancel the thread through the Future
The Future class, which provides a cancel() method to stop a Future task.
Example:
public class Test0710FutureCancelThread { private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(5); public static void timeRun(final Runnable r, long timeout, TimeUnit unit) { Future<? > task = cancelExec.submit(r); try { task.get(); } catch (InterruptedException e) {// Cancel the thread task.cancel(true); } catch (ExecutionException e) {// If the task has already completed, then cancelling will not affect // If the task is executing, then it will be interrupted task.cancel(true); }}}Copy the code
It is also the correct way to cancel a thread: with a Future. When Future.get throws InterruptedException or TimeoutException, you can call Future.cancel to cancel the task if you know the result is no longer needed
2.4 method 4: call the shutdown() method to shutdown the ExecutorService
A thread pool can be stopped by calling the shutdown() method provided by the ExecutorService class.
Example:
public class Test0716CloseExecutorService { private static final long TIMEOUT = 1000L; private static final TimeUnit UNIT = TimeUnit.MILLISECONDS; private final ExecutorService exec = Executors.newScheduledThreadPool(5); private final PrintWriter writer; public Test0716CloseExecutorService(PrintWriter writer) { this.writer = writer; } public void start() { } public void stop() throws InterruptedException { try { exec.shutdown(); Ask / / TODO: To stop a thread, call the following statement: // Wait for the task to complete after calling shutdown(). // Blocks until all tasks have completed execution after a shutdown // request, or the timeout occurs, or the current thread is // interrupted, whichever happens first. exec.awaitTermination(TIMEOUT, UNIT); } finally { writer.close(); } } public void log(String msg) { try { exec.execute(new WriteTask(msg)); } catch (RejectedExecutionException ignored) { } } }Copy the code
The core statement is as follows:
Exec. Shutdown (); Exec. awaitTermination(TIMEOUT, UNIT);Copy the code
2.5 Method 5: Eliminate the producer-consumer thread through poison pill objects
In the producer-consumer pattern, the thread is stopped by adding a poison pill object class to the queue.
Example:
public class Test0717PoisonPillCancelThread { private static final File POSION = new File(""); private final IndexerThread consumer = new IndexerThread(); private final CrawlerThread producer = new CrawlerThread(); private final BlockingQueue<File> queue; private final FileFilter fileFilter; private final File root; public Test0717PoisonPillCancelThread(BlockingQueue<File> queue, FileFilter fileFilter, File root) { this.queue = queue; this.fileFilter = fileFilter; this.root = root; } public void start() { producer.start(); consumer.start(); } public void stop() { producer.interrupt(); } public void awaitTermination() throws InterruptedException { consumer.join(); } /** * consumer */ public class extends Thread {@override public void run() {try {while (true) {File File = queue.take(); // Poison pill object stop consumer if (file == POSION) {break; } else { indexFile(file); } } } catch (InterruptedException e) { // deal with exception } finally { } } private void indexFile(File file) { } Private void crawl(File root) throws InterruptedException {}} /** * producer */ public Class CrawlerThread extends Thread { @Override public void run() { try { crawl(root); } catch (InterruptedException e) {// deal with exception} finally {while (true) {try {queue. Put (POSION); break; } catch (InterruptedException e1) { // deal with exception } } } } private void crawl(File root) throws InterruptedException { } } }Copy the code
A poison pill object is an object that is placed on a queue, and its meaning is: when this object is obtained, stop immediately.
Poison pill objects can only be used if the number of producers and consumers is known. There are as many poison pill objects as there are producers in order to stop all producer services.
Code address: github.com/prepared48/…