Today WE are introducing a thread tool, which is used for data exchange between threads. Our main role is to use it for data exchange.

Exchanger

An endorsement: Sano1100is a tool class used for data exchange between threads, which is used to provide a synchronization point where two threads can exchange data and wait for the second thread to execute the exchange() method while one thread executes the exchange() method. This should be the synchronization point in the book, and the two threads can exchange data with each other.

Take a chestnut

Take dinner tonight as an example, A to buy chicken wing rice, B to buy a hamburger, and then replace one with each other, on the code

package thread.exchanger; import java.util.concurrent.Exchanger; /** * @author ZhaoWeinan * @date 2018/10/16 * @description */ public class ExchangerDemo { private static Exchanger<String> exchanger = new Exchanger<>(); Public static void main(String[] args){new Thread(new Runnable() {@override public void run() {String a =" ; System.out.println(thread.currentThread ().getName() + "; Try {system.out.println (thread.currentThread ().getName() +" ); exchanger.exchange(a); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); New Thread(new Runnable() {@override public void run() {String b = "I bought a Hamburg!" ; System.out.println(thread.currentThread ().getName() + "; Try {system.out.println (thread.currentThread ().getName() +" ); String a = exchanger.exchange(b); System.out.println(thread.currentThread ().getName()) + "say I got a wok!" ); System.out.println(thread.currentThread ().getName() + "; } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }}Copy the code

This code describes this scenario very well. It creates two threads. In thread one, it defines a variable, A, which says “I bought chicken wing rice!” Thread 2 defines a variable b that says “I bought a burger!” And the first thread called the sanosurge. exchange(11003), which blocked (i.e. waiting for the synchronization point), and the second thread called the sanosurge. exchange(11003), which means both threads have reached the synchronization point. So thread one gets the variable “I bought a burger!” from thread two. Thread two gets the variable “I bought chicken wing rice!” from thread one. , the code running results are as follows:

Chestnut first said that this, take a look at the source

Look at the source

Briefly, there are two main classes, Node, Participant

The Node class
@sun.misc.Contended static final class Node {// Arena index int index; // The last recorded bound value is non-volatile, and the non-volatile int bound is used; // The non-volatile int bound is used. // The number of atomic operations that have failed in the current bound // We can conclude that the int collides using volatile, atomic operations are thread-safe. // Pseudo random number for spin int hash; // The data Object of this thread is the Object that is exchanged. // The Object used to free the thread is volatile Object match; // If suspended, go to the secondary Thread, otherwise go to empty volatile Thread parked; }Copy the code

This class appears to be mainly used for data exchange. The Sano11003 class has a Slot property of type Node and an Arena property of type Node[]. These two modes are called single-slot and multi-slot modes, which are used for exchanging slots

The Participant class
static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); }}Copy the code

The Participant class inherits ThreadLocal. Its main function is to initialize a Node object and look at the core exchange method

SlotExchange Indicates the exchange of a single slot

For the sake of explanation, the whole code will not be posted, paragraph by paragraph

 Node p = participant.get();
 Thread t = Thread.currentThread();
Copy the code

Create a Node object p that retrieves the current thread t

for (Node q;;) If ((q = slot)! = null) { ......... }}Copy the code

The slot attribute is not null, indicating that a thread is waiting for data to be exchanged

if (U.compareAndSwapObject(this, SLOT, q, null)) { Object v = q.item; q.match = item; Thread w = q.parked; if (w ! = null) U.unpark(w); return v; }Copy the code

Addressing this code, the compareAndSwapObject method is a method of the Sun.misc. Unsafe class

/* Compare the object field with the expected value at offset of obj, and update if they are the same. The operation of this method * should be atomic, so it provides an unbreakable way to update an object field. * @param var1 contains the object in which the field is to be modified * @param var2 Object contains the offset of the field * @param var4 expects the value to exist in the field * @param var5 If the expected value of var4 is the same as the current field value of var1, set the field value of var1 to the new value * @return true If the field value is changed, Public final Native Boolean compareAndSwapObject(Object VAR1, Long VAR2, Object VAR4, Object VAR5);Copy the code

This object is an instance of sanorecovery. The attribute of this object is equal to Q or not. If it is equal, the attribute of this object is updated to NULL and returns true, otherwise it is not updated. Return false. How is SLOT defined

private static final long SLOT; . SLOT = U.objectFieldOffset (ek.getDeclaredField("slot"));Copy the code

This SLOT corresponds to the “SLOT” field, which is used to compare the SLOT attribute in the SANodomain instance

if (U.compareAndSwapObject(this, SLOT, q, null)) { Object v = q.item; q.match = item; Thread w = q.parked; if (w ! = null) U.unpark(w); return v; }Copy the code

If the compareAndSwapObject executes successfully, the thread is suspended, and the waiting thread is invoked to return the result of the exchange. If not, let’s look at the code in the else block below:

for (Node q;;) If ((q = slot)! = null) { if (U.compareAndSwapObject(this, SLOT, q, null)) { Object v = q.item; q.match = item; Thread w = q.parked; if (w ! = null) U.unpark(w); return v; } // There is CPU contention between threads, If (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, bound, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; } else if (arena ! = null) // If multiple slots are built, return null; Else {// continue to use compareAndSwapObject to compare the exchange // the entry parameter is changed, you should notice that // to sum up this code means that the current thread occupies the slot p.tem = item; if (U.compareAndSwapObject(this, SLOT, null, p)) break; p.item = null; }}Copy the code

If there is CPU contention between threads, build a multi-slot arena to solve the problem. Otherwise, determine whether the current operation is empty, if it is empty, jump out of the loop, otherwise, wireless loop to proceed to the above process

// The current thread occupies the slot and is waiting for another thread to exchange data. long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; Object v; // loop until match is empty while ((V = p.match) == null) {if (SPINS > 0) {// Spins H ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); Else if (h < 0 && (-- SPINS & ((spins >>> 1) -1)) == 0) // In order to avoid leading other threads waiting, yield thread.yield (); } else if (slot! = p) spins = SPINS; // The thread is not interrupted // it is still in single-slot mode // and has not timed out. t.isInterrupted() && arena == null && (! Timed | | (ns = end - System. NanoTime ()) > 0, l)) {/ / set the BLOCKER U.p utObject (t, BLOCKER, this); p.parked = t; If (slot == p) // block u.mark (false, ns); // Empty BLOCKER p.ked = null; U.putObject(t, BLOCKER, null); } else if (U.compareAndSwapObject(this, SLOT, p, null)) {// Use compareAndSwapObject for comparison exchange, if successfully out of the loop // If timeout, or thread is interrupted, Null V = timed && ns <= 0L&&! t.isInterrupted() ? TIMED_OUT : null; break; } } U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; // If the thread is interrupted, null return v is returned.Copy the code

The current thread occupies the slot, waits for another thread to exchange data, spins to wait, and if another thread exchanges data, uses compareAndSwapObject to compare and exchange data, wakes up the suspended thread, and returns the exchange result. Null is returned if a timeout occurs or the thread is interrupted. This is the single-slot mode, multi-slot mode, not much research, I believe that the principle is similar, the article is a little long, not to say multi-slot mode.

Application scenarios

Tell me about application scenario, the book says can be used for synchronization task queue, genetic algorithm (said not heard), data proofreading (this also feels more solid), tell me about the first time I saw him, is my inheritance of other people’s code, use a timer task, there are a lot of a mysql table, need to compute the summary to the table, It was used, but THERE are many ways to implement it, and this one is not very impressive. I wonder if you have a good use scenario, and WE can talk about it.

We are welcome to communicate with you and point out some mistakes in the paper so that I can deepen my understanding. I hope you have no bugs, thank you!