1. Introduction

In total, there are eight blocking queues in Java.

We analyzed:

  • SynchronousQueue core source code analysis for concurrent programming
  • Concurrent programming ConcurrentLinkedQueue source analysis
  • Concurrent programming LinkedBolckingQueue source analysis
  • In concurrent programming – ScheduledThreadPoolExecutor incidentally DelayWorkQueue were analyzed.

ArrayBlockingQueue (Condition, ReentrantLock) ArrayBlockingQueue (Condition, ReentrantLock) ArrayBlockingQueue (Condition, ReentrantLock) I’m not going to analyze it.

LinkedBlockingDeque is a queue of two-way linked lists. Often used in “job-stealing algorithms” with opportunity for reanalysis.

DelayQueue is an unbounded blocking queue that supports delayed fetching of elements. Internal implementation with PriorityQueue. Have a chance to reanalyze.

PriorityBlockingQueue is an unbounded blocking queue that supports a priority, similar to DelayWorkQueue. Have a chance to reanalyze.

Today’s analysis is of one of the more interesting remaining queues: the LinkedTransferQueue.

Why is it interesting? It’s sort of a LinkedBolckingQueue and SynchronousQueue.

We know that SynchronousQueue cannot store elements internally and blocks when adding elements, which is not perfect. LinkedBolckingQueue uses a lot of locks internally and does not perform well.

Two by two, wouldn’t it be perfect? High performance and no blocking.

Let’s take a look.

2. LinkedTransferQueue is introduced

This class implements a TransferQueue. This interface defines several methods:

public interface TransferQueue<E> extends BlockingQueue<E> {
    // If possible, immediately transfer the element to the waiting consumer.
    // Rather, if there is a consumer already waiting to receive it (in the case of take or timed poll (long, TimeUnit) poll), the specified element is delivered immediately, otherwise false is returned.
    boolean tryTransfer(E e);

    // Pass the element to the consumer and wait if necessary.
    More precisely, if there is a consumer already waiting to receive it (in a Take or timed poll (long, TimeUnit) poll), the specified element is delivered immediately, otherwise it waits until the element is received by the consumer.
    void transfer(E e) throws InterruptedException;

    // Set the timeout based on the above method
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    Return true if at least one consumer is waiting
    boolean hasWaitingConsumer(a);

    // Return the estimated number of waiting consumers
    int getWaitingConsumerCount(a);
}
Copy the code

Compared with ordinary blocking queues, several methods have been added.

3. Key source code analysis

Blocking the queue consists of put, take, Offer, poll and other methods, plus several tryTransfer methods of the TransferQueue. Let’s look at the implementation of these methods.

The put method:

public void put(E e) {
     xfer(e, true, ASYNC, 0);
}
Copy the code

Take method:

public E take(a) throws InterruptedException {
    E e = xfer(null.false, SYNC, 0);
    if(e ! =null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}
Copy the code

Offer methods:

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}
Copy the code

Poll:

public E poll(a) {
    return xfer(null.false, NOW, 0);
}
Copy the code

TryTransfer method:

public boolean tryTransfer(E e) {
    return xfer(e, true, NOW, 0) = =null;
}
Copy the code

Transfer method:

public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw newInterruptedException(); }}Copy the code

Scary, all methods refer to xfer methods, but with different parameters.

The first argument, if of type PUT, is the actual value; otherwise, it is null. The second argument, whether to contain data, is of type put true and take false. The third argument, the execution type, has an immediate return NOW, an asynchronous ASYNC, a blocking SYNC, and a TIMED. The fourth parameter is only valid for the TIMED type.

So, the key method of this class is the Xfer method.

4. Xfer method analysis

Source code with comments:

private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race
        // start with head
        for(Node h = head, p = h; p ! =null;) { // find & match first node
            // The type of head.
            boolean isData = p.isData;
            // Head data
            Object item = p.item;
            // item ! = null (' put ', 'itME', 'take')
            // (itme ! = null) == isData indicates either that P is a PUT operation or that p is an unmatched take operation
            if(item ! = p && (item ! =null) == isData) { 
                // If the current operation is the same as the head operation, there is no match, end the loop, enter the following if block.
                if (isData == haveData)   // can't match
                    break;
                // If the operation is different and the match is successful, the attempt to replace item is successful,
                if (p.casItem(item, e)) { // match
                    / / update the head
                    for(Node q = p; q ! = h;) { Node n = q.next;// update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head)   == null ||
                            (q = h.next) == null| |! q.isMatched())break;        // unless slack < 2
                    }
                    // Wake up the original head thread.
                    LockSupport.unpark(p.waiter);
                    returnLinkedTransferQueue.<E>cast(item); }}// Find the next oneNode n = p.next; p = (p ! = n) ? n : (h = head);// Use head if p offlist
        }
        // If the operation does not return the type immediately
        if(how ! = NOW) {// No matches available
            // And this is the first time to enter here
            if (s == null)
                // Create a node
                s = new Node(e, haveData);
            // Try to append node to the end of the queue and return its last node.
            Node pred = tryAppend(s, haveData);
            // If null is returned, the tail node cannot be appended because its mode is the opposite of the current one.
            if (pred == null)
                / / again
                continue retry;           // lost race vs opposite mode
            // If the operation is not asynchronous (i.e. return the result immediately)
            if(how ! = ASYNC)// block waiting for matching values
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting}}Copy the code

The code is a little long, but the logic is very simple.

The logic is as follows: find the head node, if the head node is a matching operation, assign it directly, if not, add it to the queue.

Note: There is always only one type of operation in a queue, either put or Take.

The whole process is shown below:

Compared to SynchronousQueue, there is a single queue that can be stored, and compared to LinkedBlockingQueue, there is more direct passing of elements and less locking for synchronization.

Higher performance, more useful.

5. To summarize

LinkedTransferQueue is a combination of SynchronousQueue and LinkedBlockingQueue that performs better than LinkedBlockingQueue (no locking), Can store more elements than SynchronousQueue.

When put, the element is directly “handed” to the waiter-if there is a waiting thread, otherwise it is directly queued.

The difference between a PUT method and a Transfer method is that a PUT method returns immediately, while a Transfer method blocks until the consumer gets the data. The Transfer method is similar to the SynchronousQueue put method.