Blocking queue family
- Android multithreaded LinkedBlockingQueue source code parsing
- SynchronousQueue for Android Multithreading
- Andorid multithreading DelayQueue source analysis
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable { private static final long serialVersionUID = -817911632652898426L; // ArrayBlockingQueue final Object[] items; Int takeIndex; Int putIndex; Int count; // Synchronize with ReentrantLock final ReentrantLock lock; Private final Condition notEmpty; private final Condition notEmpty; private final Condition notFull; transient Itrs itrs; Public ArrayBlockingQueue(int capacity) {this(capacity,false); } // You can call fair fortruePublic ArrayBlockingQueue(int Capacity, Boolean fair) {if(capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public Boolean offer(E E) {object.requirenonNULL (E); final ReentrantLock lock = this.lock; // Synchronize lock.lock(); Try {// As you can see here, it returns if the queue is fullfalseDoes not block the thread calling the offer methodif (count == items.length)
return false;
else{// If the queue is not full, the enqueue method is called to add elements to the queue enqueue(e);return true; } } finally { lock.unlock(); }} public Boolean offer(E E, long timeout, long timeout) public Boolean offer(E E, long timeout, long timeout) TimeUnit unit) throws InterruptedException { Objects.requireNonNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // Get the interruptible lock lock.lockInterruptibly(); try {while(count == items.length) {// If the queue is full after the wait time, it returns directlyfalseFailed to add elementsif (nanos <= 0L)
return false; Nanos = notFull. AwaitNanos (nanos); } // If there is space in the queue, the element is added to the queue enqueue(e);return true; } finally { lock.unlock(); }} // The put method differs from the offer method in that if the queue is full, it will block the thread calling the put method. Public void put(E E) throws InterruptedException {objects.requirenonNULL (E); public void put(E E) throws InterruptedException {objects.requirenonNULL (E); final ReentrantLock lock = this.lock; The await() method will throw InterruptedException after the interrupt flag is set, so it is better to check if the interrupt flag is set before locking. If you set InterruptedException to throw directly, you do not need to obtain the lock.lockInterruptibly(); try {while(count == items.length) // Block if the queue is full until the notFull signal method is called, i.e. there is space in the queue notfull.await (); Enqueue (e); } finally { lock.unlock(); The poll method is used to fetch data from the queue and does not block the current thread public Epoll() { final ReentrantLock lock = this.lock; lock.lock(); Try {// We can see that null is returned if the queue is empty, otherwise dequeue is called to fetch datareturn(count == 0) ? null : dequeue(); } finally { lock.unlock(); }} // The poll overload method also adds a wait time, Public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try {while (count == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
returndequeue(); } finally { lock.unlock(); }} //take is also used to fetch data from a queue, Unlike the poll method, however, which may block the current thread public E take() throws InterruptedException {final ReentrantLock lock = this.lock; lock.lockInterruptibly(); Try {// Blocks the current thread when the queue is emptywhile(count == 0) notEmpty.await(); // Until there is data in the queue, call dequeue to return the datareturndequeue(); } finally { lock.unlock(); Private void enQueue (E x) {// Assert Lock. getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; // We can see that the queue is implemented through a loop array. When the array is full, the subscript becomes 0if(++putIndex == items.length) putIndex = 0; count++; // Activate threads that block because of notEmpty conditions, such as notempty.signal () above; } // The method to fetch data from the queue private Edequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] ! = null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // Set the subscript position of the array to null.if (++takeIndex == items.length) takeIndex = 0;
count--;
if(itrs ! = null) itrs.elementDequeued(); // Activate threads that block because of notFull conditions, such as notFull.signal(), the thread that calls the put method, above;returnx; } // The number of elements in the queue is locked, so the result is exactly a public intsize() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
returncount; } finally { lock.unlock(); }}}Copy the code
- ArrayBlockingQueue is an array of bounded blocking queues that use a global exclusive lock to queue and enqueue. Only one thread can enqueue or enqueue at a time
- ArrayBlockingQueue offers and polls are simple to lock and do not block threads. Put and take use the condition object of the reentrant lock to wait when the queue is full and wait when the queue is empty, blocking the current thread
- ArrayBlockingQueue can use the size method to get the exact number of queue elements
Welcome to follow my wechat official number, and learn and grow together with me!Copy the code