Click “like” to see, form a habit, the public account search [dime technology] pay attention to more original technical articles. This article has been included in GitHub org_Hejianhui /JavaStudy.
preface
In the JUC package, in addition to some common or common concurrency tools (ReentrantLock, CountDownLatch, CyclicBarrier, Semaphore), there is another non-common thread synchronator class – sanostat.
Sano1100a is a concurrent tool used for data exchange between two threads. It is used to find a synchronization point and exchange data between the two threads after both threads have reached the synchronization point (exchange method). The other thread waits until the synchronization point is reached, and the wait timeout can be set.
What is Recovery?
It provides a synchronization point at which two threads can exchange data with each other. The two threads exchange data using the Exchange method. If the first thread executes the Exchange method first, it will wait until the second thread executes the Exchange. When both threads reach the synchronization point, the two threads can exchange data and pass the data produced by each thread to the other. Therefore, the exchange() method is used for an interrupt using a pair of threads. When one pair of threads reaches the synchronization point, data is exchanged. Therefore, the thread objects in the tool class are paired.
Threads can act as synchronization points for pairing and exchanging elements internally. Each thread provides objects when it enters an Exchange method, matches the partner thread, and receives its partner’s object when it returns. A switch can be thought of as a SynchroniuzedQueue in a bidirectional form. Switches can be useful in applications such as genetic algorithms and pipeline design.
A wrapper tool class that is used to exchange data between two worker threads. Simply put, when one thread wants to exchange data with another thread after completing something, the first thread to retrieve data will wait for the second thread to arrive with the data before exchanging the corresponding data.天安门事件
Exchanger usage
- Sanodomain Generic type, in which V indicates the data type that can be exchanged
- Non-recovery: Waits for another thread to reach the exchange point (unless the current thread is interrupted) and then transmits the given object to the thread and receives its object.
- V V (Non-recovery) : Waits for another thread to reach the exchange point (unless the current thread is interrupted or the wait time specified by the class is exceeded) and then transmits the given object to the thread and receives its object.
Application scenarios
Sano1100it is used in the genetic algorithm, which needs to select two people for mating, and the data of the two people is exchanged and the crossover rule is used to obtain two mating results.
Sano4 is also used for proofreading. For example, we need to manually input the paper bank flow into electronic bank flow. In order to avoid mistakes, two AB employees are employed to input. After the input into Excel, the system needs to load the two Excel and proofread the data of the two Excel to check whether the input is consistent
The typical application scenario is as follows: One task is creating objects which are expensive to produce, and another task is consuming the objects. In this way, more objects can be created and consumed at the same time.
Case description
Sano1100used for data exchange between two threads (more than two threads can be involved), which is used for testing, for example:
private static void test1(a) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
String origMsg = RandomStringUtils.randomNumeric(6);
// The thread that arrives first will wait until one of the threads exchanges data with it or wait for a timeout
String exchangeMsg = exchanger.exchange(origMsg,5, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "\t origMsg:" + origMsg + "\t exchangeMsg:" + exchangeMsg);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch.await();
}
Copy the code
The fifth thread waits for a timeout because there are no matching threads, and the output is as follows:
0 origMsg:524053 exchangeMsg:098544
3 origMsg:433246 exchangeMsg:956604
4 origMsg:098544 exchangeMsg:524053
1 origMsg:956604 exchangeMsg:433246
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at com.nuih.juc.ExchangerDemo.lambda$test1$0(ExchangerDemo.java:37)
at java.lang.Thread.run(Thread.java:748)
Copy the code
The above test case is relatively simple and can simulate the message consumption scenario to observe the recovery behavior. The following example is used:
private static void test2(a) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
CountDownLatch countDownLatch = new CountDownLatch(4);
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
/ / producer
Runnable producer = new Runnable() {
@Override
public void run(a) {
try{
cyclicBarrier.await();
for (int i = 0; i < 5; i++) {
String msg = RandomStringUtils.randomNumeric(6);
exchanger.exchange(msg,5,TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "\t producer msg -> " + msg + " ,\t i -> "+ i); }}catch (Exception e){
e.printStackTrace();
}finally{ countDownLatch.countDown(); }}};/ / consumer
Runnable consumer = new Runnable() {
@Override
public void run(a) {
try{
cyclicBarrier.await();
for (int i = 0; i < 5; i++) {
String msg = exchanger.exchange(null.5,TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "\t consumer msg -> " + msg + ",\t"+ i); }}catch (Exception e){
e.printStackTrace();
}finally{ countDownLatch.countDown(); }}};for (int i = 0; i < 2; i++){
new Thread(producer).start();
new Thread(consumer).start();
}
countDownLatch.await();
}
Copy the code
The number of producer and consumer threads is the same, and the number of loops is the same, but there is still a wait timeout:
Thread-3 consumer msg -> null.0
Thread-1 consumer msg -> null.0
Thread-1 consumer msg -> null.1
Thread-2 producer msg -> 640010 , i -> 0
Thread-2 producer msg -> 733133 , i -> 1
Thread-3 consumer msg -> null.1
Thread-3 consumer msg -> 476520.2
Thread-1 consumer msg -> 640010.2
Thread-1 consumer msg -> null.3
Thread-0 producer msg -> 993414 , i -> 0
Thread-0 producer msg -> 292745 , i -> 1
Thread-2 producer msg -> 476520 , i -> 2
Thread-2 producer msg -> 408446 , i -> 3
Thread-3 consumer msg -> null.3
Thread-1 consumer msg -> 292745.4
Thread-2 producer msg -> 251971 , i -> 4
Thread-0 producer msg -> 078939 , i -> 2
Thread-3 consumer msg -> 251971.4
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at com.nuih.juc.ExchangerDemo$1.run(ExchangerDemo.java:70)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 0
Copy the code
This wait timeout is probabilistic. Why is this?
Due to unbalanced system scheduling and a large amount of spin wait at the underlying workstation, the four threads did not successfully invoke the SANo11003 for the same number of times. In addition, we can see from the output that the consumer thread does not match the producer thread as expected. The producer thread also acts as the consumer thread. Why? The role of the thread is not concerned, and the match between two threads is determined by scheduling. That is, the CPU executes two threads at the same time or next to each other, and the two threads are successfully matched.
Source code analysis
Exchanger class diagramIts main internal variables and methods are as follows:
Member attribute
// each thread has a copy between them
private final Participant participant;
// For high concurrency, save Node instances to be matched
private volatile Node[] arena;
// At low concurrency, the Node instance to be matched is used when arena is not initialized
private volatile Node slot;
// The initial value is 0, which is used to record the maximum available index of the arena array when the arena is created.
// Increases as concurrency increases until it equals the maximum value FULL,
// Will be reduced to the initial value as parallel threads match successfully one by one
private volatile int bound;
Copy the code
There are also several static properties that represent field offsets, initialized with a static code block as follows:
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long BOUND;
private static final long SLOT;
private static final long MATCH;
private static final long BLOCKER;
private static final int ABASE;
static {
int s;
try{ U = sun.misc.Unsafe.getUnsafe(); Class<? > ek = Exchanger.class; Class<? > nk = Node.class; Class<? > ak = Node[].class; Class<? > tk = Thread.class; BOUND = U.objectFieldOffset (ek.getDeclaredField("bound"));
SLOT = U.objectFieldOffset
(ek.getDeclaredField("slot"));
MATCH = U.objectFieldOffset
(nk.getDeclaredField("match"));
BLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
s = U.arrayIndexScale(ak);
// ABASE absorbs padding in front of element 0
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
} catch (Exception e) {
throw new Error(e);
}
if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
throw new Error("Unsupported array scale");
}
Copy the code
More than one static variable is defined, which is as follows:
1 << ASHIFT is the size of a cache line to prevent multiple nodes from falling into the same cache line
// The size of the array is increased by 8 times. The size of the array is increased by 7 elements, and the starting address of the element is separated by 8 elements. The middle 7 elements are empty to avoid the two adjacent elements falling into the same cache line
// Since arena is an array of objects, one element is 8 bytes, 8 bytes is 64 bytes
private static final int ASHIFT = 7;
// The maximum index of the arena element is 255
private static final int MMASK = 0xff;
// The maximum length of the arena array is 256
private static final int SEQ = MMASK + 1;
// Get the CPU cores
private static final int NCPU = Runtime.getRuntime().availableProcessors();
// The actual length of the array, since threads are paired in pairs, the maximum length is the number of cores divided by 2
static final int FULL = (NCPU >= (MMASK << 1))? MMASK : NCPU >>>1;
// The number of spin waits
private static final int SPINS = 1 << 10;
// If the object exchanged is null, this object is returned
private static final Object NULL_ITEM = new Object();
// This object is returned if the exchange fails due to wait timeout
private static final Object TIMED_OUT = new Object();
Copy the code
The inner class
The Non-recovery class has two internal classes, one Node and one Participant. The Participant inherits ThreadLocal and overrides its initialValue method to return a Node object. Its definition is as follows:
@sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}
/** The corresponding thread local class */
static final class Participant extends ThreadLocal<Node> {
public Node initialValue(a) { return newNode(); }}Copy the code
Contended annotations are used to avoid pseudo-sharing problems caused by caching rows for classes
- Index records the index of the arena array
- Bound Records the last Recovery bound attribute
- This collides is used to record the number of CAS preemption failures with bound unchanged
- Hash is used to compute random numbers while spin waits
- Item represents the object that the current thread is requesting to exchange
- Match is the result of a swap with another thread. If match is not null, the swap succeeds
- Parked is the dormant thread associated with the Node.
Important method
Exchange () method
@SuppressWarnings("unchecked")
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null)? NULL_ITEM : x;// translate null args
if((arena ! =null || // Execute the following method if it is null
(v = slotExchange(item, false.0L)) = =null) &&
// If slotExchange is executed, execute, otherwise return
((Thread.interrupted() || // execute the following method if it is not interrupted
(v = arenaExchange(item, false.0L)) = =null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
Copy the code
Exchange method execution steps:
- If soltExchange results, arenaExchange follows;
- If the slot is occupied, arenaExchange is executed;
- The returned data v is the data item of the other thread;
- Summary: If line A is called first, then A’s items are stored in item, and B’s items are stored in Math.
- When no multiple threads are available for Exchange, slotExchange is sufficient. Slot is a node object.
- When concurrency occurs, a slot is not enough and a node array arena is used.
SlotExchange () method
If the slot attribute is null, the current thread will change the slot attribute from null to the Node of the current thread. If the change is unsuccessful, the next for loop will follow the non-null solt attribute logic. If the change is successful, the spin waits. After spinning for a certain number of times, the current thread will spin for a specified time through the Unsafe park method. The Node property of the current thread will be checked as null. If it is not null, the interaction is successful and the object will be returned. Otherwise, it returns null or TIME_OUT. Before returning, attributes such as item and match are set to null to save the hash value calculated during the previous spin for the convenience of the next call to slotExchange.
When the slotExchange method is called, if the slot attribute is not null, the current thread attempts to change it to NULL. If cas is successfully modified, the current thread matches the thread corresponding to the slot attribute, and the item attribute corresponding to the Node corresponding to the slot attribute is obtained. Save the current thread swap object to the match attribute of the Node corresponding to the slot attribute, and wake up the waiter attribute of the Node corresponding to the slot attribute. Save the hash of the previous spin calculation for the next call to slotExchange; If the CAS fails to modify the slot property, another thread is preempting the slot, and the arena property is initialized. The next for loop returns null because the arena property is not null, thus completing the exchange through arenaExchange.
// this method is called if arena is null and returns null indicating that the exchange failed
// item is the exchange object, timed indicates whether to wait the specified time, false indicates that the wait is indefinite, and ns indicates the wait time
private final Object slotExchange(Object item, boolean timed, long ns) {
// Get the Participant Node associated with the current thread
Node p = participant.get();
Thread t = Thread.currentThread();
// Interrupt, return null
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
for (Node q;;) {
if((q = slot) ! =null) { // slot is not null
// Set slot to null. The thread corresponding to slot matches the current thread successfully
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
// Save the item to complete the interaction
q.match = item;
// Wake up q's dormant thread
Thread w = q.parked;
if(w ! =null)
U.unpark(w);
return v;
}
// The slot fails to be modified. Another thread preempts the slot. This logic is triggered when multiple threads call the Exchange method simultaneously
// bound equals 0 to indicate that it is not initialized
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
else if(arena ! =null)
return null; // Carena is not null and interacts with arenaExchange
else {
// Slot and arena are null
p.item = item;
// Change slot to P. If the change succeeds, the loop is terminated
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
// Continue the for loop and restore oTEM to NULL
p.item = null; }}// Change slot to p to enter this branch
int h = p.hash; // Hash starts with 0
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1)? SPINS :1;
Object v;
// match holds objects exchanged with other threads. If not null, the exchange is successful
while ((v = p.match) == null) {
// Perform spin wait
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId(); Initialize the hSpins are reduced only if h generated is less than 0
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) = =0)
Thread.yield();
}
// Slot has been modified, a thread matching the slot has been re-spun, and the properties have been read, because the slot was modified before the properties were modified, the time difference between the two may be due to CPU scheduling problems
else if(slot ! = p) spins = SPINS;// The thread is not interrupted and arena is null
else if(! t.isInterrupted() && arena ==null&& (! timed || (ns = end - System.nanoTime()) >0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
// The thread is woken up to continue the next for loop
// If you wake up because of wait timeout, the next time the for loop enters the next else if branch, TIMED_OUT is returned
p.parked = null;
U.putObject(t, BLOCKER, null);
}
// Change slot to p
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
// timed is flase and waits indefinitely because the interrupt is awakened and returns null
// timed is true, TIMED_OUT is returned, null is returned because the interrupt is awakened
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break; }}// Change match to null, item to null, save h, the next exchange h is not the initial value of 0
U.putOrderedObject(p, MATCH, null);
/ / reset the item
p.item = null;
// Reserve pseudorandom numbers for next seed numbers
p.hash = h;
/ / return
return v;
}
Copy the code
To summarize the logic implemented above:
- Exchange uses object pooling technology to store objects in a ThreadLocal. This Node encapsulates key data items, thread objects, etc.
- When the first thread enters, it puts data into a pooled object, assigns a value to slot’s item, and blocks itself (usually not immediately, but using yield spin for a while), waiting for the value to be evaluated.
- When the second thread enters, it retrieves the value stored in slot item, assigns the value to slot match, and wakes up the thread that blocked last time.
- When the first thread is woken up, it gets the slot match value, resets the slot data and pool object data, and returns its own data.
- If time out, the Time_out object is returned;
- If the thread is interrupted, null is returned.
In this method, two results are returned: a valid item or null. Either the thread contended to use the slot, created the Arena array, or the thread interrupted.
Let’s look at the logic here
ArenaExchange () method
ArenaExchange is based on the arena attribute to complete the exchange, the overall logic is relatively complex, there are several points:
- M of the initial value is 0, the index of the initial value is 0, the two were all greater than or equal to 0 and I not greater than m, when a thread attempts to preempt the index corresponding array element Node failure cases, trying to m + 1, then grab m + 1 new array elements, the corresponding to the null revision it into the current thread connection Node, Then the spins wait to match; If the spin ends and there are no matching threads, the new array element corresponding to m plus 1 is reset to NULL, m minus 1 is reduced, and the for loop preempts the other array elements that are null again. In extreme concurrency, m increases until it reaches the maximum FULL, after which it can only try to match other threads or preempt array elements with null through the for loop, and then decreases to zero as concurrency decreases. This dynamic adjustment of M can avoid CAS failure caused by too many threads modifying the same element based on CAS and improve the efficiency of matching. This idea is consistent with the implementation of LongAdder.
- The Unsafe park method puts a thread to sleep only when m equals 0. Otherwise, multiple parallel threads waiting for matches are spun to wait for other threads to come, because swapping itself is fast and transient. Through spin wait, multiple waiting threads can complete matching quickly. Park blocking is only considered if there is currently only one thread left, where m must be equal to 0 and there are no matching threads in a short period of time.
// Enter this method after slot preemption fails, arena is not empty
private final Object arenaExchange(Object item, boolean timed, long ns) {
Node[] a = arena;
Node p = participant.get();
// index starts with 0
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // j is raw array offset
// When arena is created, the original array capacity << ASHIFT, to avoid array elements falling into the same cache line
<< ASHIFR is also needed to get the real array element index
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
// If q is not null, the corresponding array element is set to NULL, indicating that the current thread and the corresponding thread match L
if(q ! =null && U.compareAndSwapObject(a, j, q, null)) {
Object v = q.item; // release
q.match = item; // Save the item. The interaction is successful
Thread w = q.parked;
if(w ! =null) // Wake up the waiting thread
U.unpark(w);
return v;
}
// If Q is null or q is not null, the CAS fails to preempt Q
// when bound is initialized, SEQ & MMASK is 0, m is 0, I is 0
else if (i <= (m = (b = bound) & MMASK) && q == null) {
p.item = item; // offer
if (U.compareAndSwapObject(a, j, null, p)) {
long end = (timed && m == 0)? System.nanoTime() + ns :0L;
Thread t = Thread.currentThread(); // wait
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
if(v ! =null) {
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
return v;
}
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) = =0)
Thread.yield(); // two yields per wait
}
else if(U.getObjectVolatile(a, j) ! = p) spins = SPINS;// releaser hasn't set match yet
else if(! t.isInterrupted() && m ==0&& (! timed || (ns = end - System.nanoTime()) >0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
if(m ! =0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart}}}else
p.item = null; // clear offer
}
else {
if(p.bound ! = b) {// stale; reset
p.bound = b;
p.collides = 0; i = (i ! = m || m ==0)? m : m -1;
}
else if((c = p.collides) < m || m == FULL || ! U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0)? m : i -1; // cyclically traverse
}
else
i = m + 1; // growp.index = i; }}}Copy the code
conclusion
Exchange is similar to SynchronousQueue in that two threads operate on the same object, but as we said at the beginning, SynchronousQueue uses the same property, distinguished by different isData. Queues are used for queuing.
Exchange uses two properties of an object, item and match. The isData attribute is not needed because there is no isData semantics in Exchange. While multithreading concurrency is controlled by arrays, each thread accesses a different slot in the array.
PS: The above code is submitted to Github: github.com/Niuh-Study/…
PS: There is a technical exchange group (QQ group :1158819530), which is convenient for everyone to communicate with each other, continue to learn, and make progress together. If you need it, you can add it.
GitHub Org_Hejianhui /JavaStudy GitHub Hejianhui /JavaStudy