preface

BlockingQueue is an excellent use of ReentrantLock. Based on its basic principle, we can implement long-link chat on the Web. Of course, it is most commonly used to implement producer and consumer mode, as shown in the following figure:

In Java, BlockingQueue is an interface, Its implementation classes include ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue, etc. They differ mainly in storage structures or operations on elements, but the principles of take and put are similar. The following source code uses ArrayBlockingQueue as an example.

Analysis of the

There is a ReentrantLock inside BlockingQueue that generates two conditions, as seen in the ArrayBlockingQueue attribute declaration:

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private finalCondition notFull; .public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
Copy the code

If notEmpty, notFull, put thread, and take thread were humanized, then I think put and take might look something like this:

put(e)

take()

Arrayblockingqueue.put (E E) arrayBlockingQueue.put (E E)

/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc} * /
public void put(E e) throws InterruptedException {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
      while (count == items.length)
          notFull.await(); // If the queue is full, wait
      insert(e);
  } finally{ lock.unlock(); }}/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void insert(E x) {
		items[putIndex] = x;
    	putIndex = inc(putIndex);
   	++count;
		notEmpty.signal(); // A new element has been inserted, notifying the waiting fetching element thread
}
Copy the code

Arrayblockingqueue.take ()

public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await(); // If the queue is empty, wait
        return extract();
    } finally{ lock.unlock(); }}/** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */
private E extract(a) {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]);
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal(); // A new element has been fetched, notifying the insertion thread in the wait
    return x;
}
Copy the code

As you can see, put(E) is synchronized with take(). In put operations, when the queue is full, the put operation is blocked until there is a free place in the queue. In a take operation, when the queue is empty, the take operation is blocked until there are new elements in the queue.

By using two conditions, you avoid invoking the same PUT or take operation when you call signal().

Refer to the address

  • Blog.csdn.net/t894690230/…

If you like my article, you can follow the individual subscription number. Welcome to leave messages and communicate at any time. If you want to join the wechat group to discuss with us, please add the administrator to simplify the stack culture – little Assistant (lastpass4U), he will pull you into the group.