Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star
1. Function Introduction
SynchronousQueue is a member of the BlockingQueue family that, unlike other members, has the following characteristics:
1. The entire queue has no capacity. Each time you put a value in the queue, you must wait for the corresponding consumer to remove it before you can put it again. 3. The whole queue is divided into TransferQueue FIFO and non-fair mode (TransferStack LIFO default) 4. If TransferQueue is used, there is always a dummy node in the queueCopy the code
2. Constructor
/** * Creates a {@code SynchronousQueue} with nonfair access policy */ public SynchronousQueue() { this(false); } /** * Creates a {@code KSynchronousQueue} with the specified fairness policy * @param fair */ public SynchronousQueue(Boolean fair){transferer = fair? SynchronousQueue(Boolean fair){transferer = fair? new TransferQueue<E>() : new TransferStack<E>(); }Copy the code
We can see that the default TransferStack is used as the internal node container, and we can use fair to determine fairness
3. Fair mode TransferQueue
/** * This is a very typical queue, which has the following characteristics * 1. Dummy dummy dummy dummy dummy dummy dummy dummy dummy dummy dummy dummy dummy */ ** head */ transient volatile QNode head; */ ** head */ transient volatile QNode head; /** TRANSIENT QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was last inserted node * when it was cancelled */ /** * * When you want to delete node node, if node node is the end of the queue, start using this node. * Why? * It is known that deleting a node can be done by a.casnext (B, b.ext) directly, but when node B is the last element in the whole queue, * One thread deletes node B and another thread inserts node after node B, which is easy to cause the loss of the inserted node. This cleanMe is similar to * ConcurrentSkipListMap delete add marker node, they all perform the same function */ TRANSIENT volatile QNode cleanMe; TransferQueue(){/** * constructs a dummy node, */ dummy node h = new QNode(null, false); */ dummy node h = new QNode(null, false); head = h; tail = h; Next = this, help GC, * this is the same as ConcurrentLinkedQueue */ void advanceHead(QNode h, QNode nh){ if(h == head && unsafe.compareAndSwapObject(this, headOffset, h, nh)){ h.next = h; Void advanceTail(QNode t, QNode nt){ if(tail == t){ unsafe.compareAndSwapObject(this, tailOffset, t, nt); }} /** CAS set cleamMe node */ Boolean casCleanMe(QNode CMP, QNode val){ return cleanMe == cmp && unsafe.compareAndSwapObject(this, cleanMeOffset, cmp, val); }Copy the code
The TransferQueue is a dual queue, which is initialized with a dummy node by default. The most special one is cleanMeNode, cleanMeNode is a tag node, and cleanMenode. next node is the node that needs to be deleted due to interruption or timeout. When the last node of the queue is cleared, this node is not deleted directly, but the successor node of the deleted nodes is marked as cleanMe node to prepare for the next deletion, and its function is similar to that of marker node in ConcurrentSkipListMap. ConcurrentSkipListMap (ConcurrentSkipListMap, ConcurrentSkipListMap, ConcurrentSkipListMap)
3. Fair mode TransferQueue Transfer method
The main logic of this method:
1. If the queue is empty/the last node in the queue is of the same type as its own, add node to the queue. The timeout/ Interrupt awaitFulfill method returns either null (as returned by producer) if the node itself matches until a timeout/interrupt/ other thread matches that thread. Or actually pass the value (returned by the consumer) 2. The queue is not empty, and the head.next node of the queue is the node matched by the current node, performs data transfer matching, and dequeue the node of the previous block through advanceHead methodCopy the code
Look directly at the code transfer
/** * Puts or takes an item ** @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. * @param timed if this operation should timeout * @param nanos the timeout, in nanosecond * @return */ @Override E transfer(E e, boolean timed, long nanos) { /** * Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be * fulfilled (or cancelled) and return matching item. * * 2. If queue apparently contains waiting items, and this * call is of complementary mode, try to fulfill by CAS'ing * item field of waiting node and dequeuing it, and then * returning matching item. * * In each case, along the way, check for gurading against * seeing uninitialized head or tail value. This never * happens in current SynchronousQueue, but could if * callers held non-volatile/final ref to the * transferer. The check is here anyway because it places * null checks at top of loop, The main method, which is usually faster * than having them implicity interspersed * *, can be divided into two cases * * 1. If the queue is empty/the last node in the queue is of the same type as its own, add node * to the queue. The timeout/interrupt/ interrupt method returns node itself * if the match is successful, either null (as returned by producer) will be returned. Or actually pass the value (returned by consumer) * * 2. Dequeue */ QNode s = null; advanceHead = dequeue */ QNode s = null; // constrcuted/reused as needed boolean isData = (e ! = null); // 1. = null is used to distinguish producer from consumer for(;). { QNode t = tail; QNode h = head; if(t == null || h == null){ // 2. Data is not initialized, continue redo continue; // spin } if(h == t || t.isData == isData){ // 3. QNode tn = t.next; QNode tn = t.next; QNode tn = t.next; if(t ! = tail){// 4. } if(tn ! = null){// 5. Other threads add tail.next, so help advanceTail advanceTail(t, tn); continue; } if(timed && nanos <= 0){ // 6. Call method of type WAIT, and timeout, return NULL, Poll () : synchronousQueue.poll () returns null if a thread on the current queue has a matching thread on it. } if(s == null){s = new QNode(e, isData); T. casNext(null, s)){// 8. Add new node to queue continue;} advanceTail(t, s); Object x = awaitFulfill(S, e, timed, nanos); Call awaitFulfill, if the node is head.next, perform some spin, if not, block until another thread matches. If (x == s){// if(x == s){if(x == s){ Otherwise, x == null (s is producer) or (s is consumer) Clean (t, s); If s is not the last node in the linked list, CAS will delete the node directly. If S is the last node in the linked list, CAS will either delete the previous cleamMe node (cleamMe! = null), then set s.rev to cleanMe node, delete next time or set s.rev directly to cleanMe return null;} if(! {// 13. Node s has no offList advanceHead(t, s); // 14. Advance the head node, and next time call the s.ext node to match (advanceHead is called here, because the code can be executed here). = null){// and forget fields s.tem = s;} s.waiter = null; = null) ? (E)x :e; }else{// 16. Perform thread matching from head. Next Forever and the head node is a dummy node this and AQS) QNode m = h.n ext; / / 17. Access to the head. The next ready to match the if (t! = tail | | m = = null | | h! = head){ continue; // 18. Inconsistent reads, The queue structure is inconsistent with read} /** Producer and consumer matching operations * 1. Get m's item * 2 Only produce and consumer can be matched to * 3. X == m determines whether node M has been canceled. (QNOde#tryCancel) * 4. M. casitem exchanges data between producer and consumer (cas operation may fail if the CAS operation succeeds) * 5. Why dequeue h instead of m Head. Next */ Object x = M. tem; if(isData == (x! = null) | | / / 19. The two patterns match (because the concurrent environment may have left other threads strong matching nodes) x = = m | | / / 20 m nodes thread interruption or wait timeout! M. casitem (x, e) // 21. Perform CAS operation to change the item value of the waiting thread (concumer/producer is possible){advanceHead(h, m); } advanceHead(h, m); Locksupport. unpark(m.waiter); // 23. (e)x: (x!= null) (e)x: (e)x: (x!= null) If the producer is a consumer, x == null. If the producer is a consumer, return e}}}.Copy the code
OK, let’s comb through the general process:
1. The queue is empty at the beginning, and the thread is encapsulated as QNode directly. The thread enters the spin wait state through awaitFulfill method, and waits until a thread matches the queue unless timeout or the thread is interrupted. If the isData of the next thread is the same as that of the tail node, the first step is carried out, otherwise the data transfer (Step 21) is carried out, and then unpark the waiting thread 3. The waiting thread wakes up, returns from the awaitFulfill method, and finally returns the resultCopy the code
4. Fair mode TransferQueue awaitFulfill
/** * Spins/blocks until node s is depressing ** Spins a few minutes if the node is head. Next, or spins a few minutes if it doesn't, call locksupport.park/parkNanos(), Wait until another thread wakes up * * @param s the waiting node * @param e the comparsion value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s of cancelled */ Object awaitFulfill(QNode s, E e, boolean timed, long nanos){ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = thread.currentThread (); // timed = true Thread w = thread.currentThread (); Spins = ((head. Next == s)? // 4. Timed timed? If the current node is head. maxTimeSpins : maxUntimedSpins) : 0); for(;;) {// loop until successful if(w.isinterrupted ()){// 4. If the thread is interrupted, directly add item = this, and the return value will be judged in transfer (Step 11 in Transfer) s.cancel (e); } Object x = s.item; if(x ! If (e){// 5. = e, return, return x; } if(timed){ nanos = deadline - System.nanoTime(); If (nanos <= 0L){// 6. Wait time out, change the item value of node, continue, next to step 5 of awaitFulfill -> return s.cancel (e); continue; }} if(spins > 0){// spins decrease once -spins; } else if(s.waiter == null){ s.waiter = w; } else if(! Timed){// 8. Timed without timeout park locksupport. park(this); } else if(nanos > spinForTimeoutThreshold){ // 9. Park locksupport. parkNanos(this, nanos); }}}Copy the code
Combing logic:
1. Calculate timeout time (if time = true) 2. Dummy node = head. Next (dummy node exists in queue, AQS also); if so, spin is assigned. Other nodes do not need spin. Next comes the spin, which blocks until another thread wakes up, or the thread interrupts (in this case, the thread interrupts returns Node itself).Copy the code
5. Fair mode TransferQueue Clean
** * Gets rid of cancelled node s with original predecessor pred. */ void clean(QNode pred, QNode s) { s.waiter = null; // Forget thread // 1. exactly one node on list cannot be * deleted -- the last inserted node. To accommodate this, * if we cannot delete s, we save its predecessor as * "cleanMe", deleting the previously saved version * first. At least one of node s or the node previously * saved can always be * * The last inserted node cannot be deleted at any point in the program's run (deletion here means direct deletion via CAS, * When s is the last node, save s.color as cleamMe node, While (pred.next == s) {// Return early if already unlinked // 2. Pred. next = next QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head if (hn ! = null && hn.iscancelled ()) {// 3. Head cancelled (hn) = null && hn.iscancelled ()) { Exit loop advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail if (t == h) // 4. If the queue is empty, another thread has deleted the node (note that there is always a dummy node) return; QNode tn = t.next; if (t ! = tail) // 5. Other threads change tail, continue again continue; if (tn ! = null) { advanceTail(t, tn); // 6. Help push tail continue; } if (s ! = t) { // If not tail, try to unsplice // 7. Node S is not the tail node, then CAS deletes the node directly (there is no risk of doing this in the middle of the queue). if (sn == s || pred.casNext(s, sn)) return; } QNode dp = cleanMe; If (dp! = null) { // Try unlinking previous cancelled node QNode d = dp.next; // 9. CleanMe is not null, delete s node once, namely node d QNode dn; if (d == null || // d is gone or // 10. There are several special cases: 1. The original s node (), which is the node d here, has been deleted; 2. The original node cleanMe has been deleted through advanceHead; 3 The original node S has been deleted (so! D.s iCancelled), the presence of these three, clear the cleanMe directly d = = dp | | / / d is off the list the or! d.isCancelled() || // d not cancelled or (d ! = t && // d not tail and // 11. D is not tail and dn does not have offlist. This is done by cleanMe to clean the nodes in the middle of the queue (dn = d.ext)! = null && // has successor dn ! = d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); CleanMe node, where dp == pred is true, clean node s, On success, return directly, otherwise loop again, proceed to Step 13, set cleanMe this time and return if (dp == pred) return; } else if (casCleanMe(null, pred)) Then mark pred as cleamMe to mark return for clearing S node next time. // Postpone cleaning s } }Copy the code
The clean method is the crux of the entire code analysis process:
2. Significance of the existence of cleanMe nodeCopy the code
This method is called when the node thread interrupts or waits for timeout. Two cases are discussed when clearing:
Select * from ConcurrentLikedQueue (s, s.next); select * from ConcurrentLikedQueue (s, s.next); 1) At this time cleanMe == NULL, then the pred of the predecessor node is marked as cleanMe to prepare for the next deletion. 2) At this time cleanMe! = null, delete the node that was deleted last time, then cleanMe to null, and then assign preD to cleanMe. Marker and cleanMe both work to prevent multiple nodes from being deleted in a concurrent environmentCopy the code
Reference:
SynchronousQueue Source Code Analysis (Java 8)