This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!
Many interview questions are asked about multi-threading. For example:
“There are multiple parallel tasks such as Task1 and Task2. How to execute Task3 after all tasks are completed?”
In Kotlin, we have a variety of implementations, all of which are summarized in this article.
1. Thread.join
2. Synchronized
3. ReentrantLock
4. BlockingQueue
5. CountDownLatch
6. CyclicBarrier
7. CAS
8. Future
9. CompletableFuture
10. Rxjava
11. Coroutine
12. Flow
Task3 is a string based on the results returned by Task1 and Task2. Each Task is simulated by sleep:
val task1: () -> String = {
sleep(2000)
"Hello".also { println("task1 finished: $it")}}val task2: () -> String = {
sleep(2000)
"World".also { println("task2 finished: $it")}}val task3: (String, String) -> String = { p1, p2 ->
sleep(2000)
"$p1 $p2".also { println("task3 finished: $it")}}Copy the code
1. Thread.join()
Kotlin is Java compliant, and all Java threading tools are available by default. The simplest way to synchronize threads is to use Thread’s join() :
@Test
fun test_join(a) {
lateinit var s1: String
lateinit var s2: String
val t1 = Thread { s1 = task1() }
val t2 = Thread { s2 = task2() }
t1.start()
t2.start()
t1.join()
t2.join()
task3(s1, s2)
}
Copy the code
2. Synchronized
Use synchronized locks for synchronization
@Test
fun test_synchrnoized(a) {
lateinit var s1: String
lateinit var s2: String
Thread {
synchronized(Unit) {
s1 = task1()
}
}.start()
s2 = task2()
synchronized(Unit) {
task3(s1, s2)
}
}
Copy the code
However, if there are more than three tasks, synchrnoized is a tricky way to write: n locks are declared and n synchronized are nested in order to synchronize the results of multiple parallel tasks.
3. ReentrantLock
ReentrantLock is a thread lock provided by JUC that can replace the use of synchronized
@Test
fun test_ReentrantLock(a) {
lateinit var s1: String
lateinit var s2: String
val lock = ReentrantLock()
Thread {
lock.lock()
s1 = task1()
lock.unlock()
}.start()
s2 = task2()
lock.lock()
task3(s1, s2)
lock.unlock()
}
Copy the code
The benefit of ReentrantLock is that there is no synchrnoized problem when you have multiple parallel tasks, but you still need to create multiple locks to manage different tasks.
4. BlockingQueue
Blocking queues are also implemented internally by locking, so synchronous locking can also be achieved
@Test
fun test_blockingQueue(a) {
lateinit var s1: String
lateinit var s2: String
val queue = SynchronousQueue<Unit>()
Thread {
s1 = task1()
queue.put(Unit)
}.start()
s2 = task2()
queue.take()
task3(s1, s2)
}
Copy the code
Of course, blocking queues are used more for synchronization in production/consumption scenarios.
5. CountDownLatch
Most locks in JUC are based on AQS, which can be divided into exclusive locks and shared locks. ReentrantLock is an exclusive lock. In contrast, shared locks are more suitable for this scenario. For example, CountDownLatch, which keeps one thread blocked until the execution of the other threads is complete:
@Test
fun test_countdownlatch(a) {
lateinit var s1: String
lateinit var s2: String
val cd = CountDownLatch(2) Thread() { s1 = task1() cd.countDown() }.start() Thread() { s2 = task2() cd.countDown() }.start() cd.await() task3(s1, s2) }Copy the code
The advantage of shared locks is that you don’t have to create a separate lock for each task, and it’s easy to write even more parallel tasks
6. CyclicBarrier
CyclicBarrier is another shared locking mechanism provided by JUC. It allows a group of threads to reach a synchronization point and then continue to run together. If any thread does not reach the synchronization point, the other arriving threads will be blocked.
Unlike CountDownLatch, which is one-time, CyclicBarrier can be reset and reused, which is why it is named Cyclic
@Test
fun test_CyclicBarrier(a) {
lateinit var s1: String
lateinit var s2: String
val cb = CyclicBarrier(3)
Thread {
s1 = task1()
cb.await()
}.start()
Thread() {
s2 = task1()
cb.await()
}.start()
cb.await()
task3(s1, s2)
}
Copy the code
7. CAS
AQS is internally synchronized through spin-locking, which essentially uses CompareAndSwap to avoid the overhead of thread blocking. Therefore, we can use the CAS based atomic class count to achieve the goal of locking free operations.
@Test
fun test_cas(a) {
lateinit var s1: String
lateinit var s2: String
val cas = AtomicInteger(2)
Thread {
s1 = task1()
cas.getAndDecrement()
}.start()
Thread {
s2 = task2()
cas.getAndDecrement()
}.start()
while (cas.get() != 0) {}
task3(s1, s2)
}
Copy the code
While loop idling may seem like a waste of resources, but that’s the nature of spin locking, so CAS is only good for cpu-intensive short task synchronization.
volatile
When you look at the lock-free implementation of CAS, many of you may think of volatile. Can you achieve lock-free thread-safety as well?
@Test
fun test_Volatile(a) {
lateinit var s1: String
lateinit var s2: String
Thread {
s1 = task1()
cnt--
}.start()
Thread {
s2 = task2()
cnt--
}.start()
while(cnt ! =0) {
}
task3(s1, s2)
}
Copy the code
Volatile guarantees visibility, but not atomicity. CNT — is not thread-safe and requires locking
8. Future
It is inconvenient to define two variables, S1 and S2, to record the results, whether there is a lock operation or no lock operation. Since Java 1.5, Callable and Future have been provided to return results at the end of task execution.
@Test
fun test_future(a) {
val future1 = FutureTask(Callable(task1))
val future2 = FutureTask(Callable(task2))
Executors.newCachedThreadPool().execute(future1)
Executors.newCachedThreadPool().execute(future2)
task3(future1.get(), future2.get()}Copy the code
With future.get(), you can wait for the results to return synchronously, which is easy to write
9. CompletableFuture
Future.get () is convenient, but it blocks the thread. Java 8 introduced the CompletableFuture, which implements the CompletionStage interface along with the Future interface. CompletableFuture can logically combine multiple CompletionStages to achieve complex asynchronous programming. These logically combined methods avoid thread blocking in the form of callbacks:
@Test
fun test_CompletableFuture(a) {
CompletableFuture.supplyAsync(task1)
.thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
task3(p1, p2)
}.join()
}
Copy the code
10. RxJava
The various operators and thread-switching capabilities provided by RxJava can also help: the ZIP operator can combine two Observable results; SubscribeOn Starts an asynchronous task
@Test
fun test_Rxjava(a) {
Observable.zip(
Observable.fromCallable(Callable(task1))
.subscribeOn(Schedulers.newThread()),
Observable.fromCallable(Callable(task2))
.subscribeOn(Schedulers.newThread()),
BiFunction(task3)
).test().awaitTerminalEvent()
}
Copy the code
11. Coroutine
So many of these are actually Java tools. Coroutine is finally Kotlin’s unique tool:
@Test
fun test_coroutine(a) {
runBlocking {
val c1 = async(Dispatchers.IO) {
task1()
}
val c2 = async(Dispatchers.IO) {
task2()
}
task3(c1.await(), c2.await())
}
}
Copy the code
It is very comfortable to write, and it is a combination of the advantages of the previous tools.
12. Flow
Flow is a Coroutine version of RxJava with many RxJava operators, such as zip:
@Test
fun test_flow(a) {
val flow1 = flow<String> { emit(task1()) }
val flow2 = flow<String> { emit(task2()) }
runBlocking {
flow1.zip(flow2) { t1, t2 ->
task3(t1, t2)
}.flowOn(Dispatchers.IO)
.collect()
}
}
Copy the code
FlowOn causes tasks to compute and emit results asynchronously.
conclusion
All the ways mentioned above are just like the four ways of writing hui in anise beans. It’s not necessary to master all of them. In conclusion, the best thread synchronization scheme on Kotlin starts with coroutines!