preface

For a long time I have found that many developers have little use, let alone understanding, of J.U.C (java.util.concurrent) in the JDK; But it’s also a necessary level to advance to.

I’ve shared more or less about it before, but not systematically; So I want to organize a series of articles related to the package.

The content mainly includes the following parts:

  • Implement your own concurrency tool based on the definition.
  • Standard implementation of the JDK.
  • Practice cases.

I don’t think you know anything about this because of these three points.

Since opened a new pit, do not want to do too badly; So I’m going to cover most of the categories on this list.

So I’m going to focus on ArrayBlockingQueue.

Their implementation

A few characteristics of blocking queues should be understood before implementing them yourself:

  • Basic queue features: First in, first out.
  • Blocks when write queue space is not available.
  • Fetching queue data blocks if the queue is empty.

There are many ways to implement queues, generally arrays and linked lists; In fact, we only need to understand one of them, and the difference is mainly the difference between arrays and linked lists.

The ArrayBlockingQueue here is obviously implemented by an array given its name.

We first try to implement it according to these three features.

Initializing the queue

I’ve created a custom class called ArrayQueue, which has the following constructor: ArrayQueue

    public ArrayQueue(int size) {
        items = new Object[size];
    }
Copy the code

Items are clearly an array of data; The array needs to be created by size during initialization.

Written to the queue

Writing to the queue is relatively simple, just need to store the data in order to the array, as shown below:

But there are a few caveats:

  • When the queue is full, the writing thread needs to block.
  • If the number of writes to the queue is larger than the queue size, write from the first index.

Let’s look at the first queue full, the writing thread needs to block, let’s think about how to block a thread, it looks like the thread is stuck and nothing can be done.

There are several ways to achieve this effect:

  • Thread.sleep(timeout)Thread sleep.
  • object.wait()Let the thread inwaitingState.

Of course, some joins, locksupport. part and so on are beyond the scope of this discussion.

Another very important feature of blocking queues is that when queue space is available (fetching the queue), the writer thread needs to be woken up so that data can be written in.

So thread. sleep(timeout) is clearly not appropriate, and it will continue running after the timeout period; Wake up to continue running until space is available.

In fact, such a feature is easy to think of Java wait notification mechanism to achieve inter-thread communication; More multithreaded communication schemes can be seen here: In-depth understanding of thread communication

So what I have done here is to call object.wait() to enter waiting once the queue is full and then wake up when space is available.

    /** * queue full blocking lock */
    private Object full = new Object();

    /** * block lock when queue is empty */
    private Object empty = new Object();
Copy the code

So two objects are declared to notify each other when the queue is full and empty.

Empty.notify () is used after a successful write. The purpose is to wake up the thread consuming the queue when the fetch queue is empty.

Both wait and notify require synchronized method blocks on their respective objects, because wait and notify require their respective locks to be acquired.

Consumer queue

As mentioned above, when the queue is empty, the thread that fetched the queue needs to be blocked until there is data in the queue.

The code is very similar to what was written and is easy to understand; It’s just that the wait and wake is the opposite, which can be easily understood by the following graph:

In summary:

  • Block when the write queue is full until the fetch thread wakes up after consuming the queue data.
  • When the consuming queue is empty, it blocks until the writing thread wakes up after writing to the queue.

test

Let’s start with a basic test: single-threaded write and consume.

3, 123, 1234, 12345Copy the code

It turns out there’s nothing wrong.


When the data written exceeds the size of the queue, it can only be consumed before it can be continued.

The 2019-04-09 16:24:41. 040 [Thread - 0] INFO C.C.C oncurrent. ArrayQueueTest - [Thread - 0] 123 2019-04-09 16:24:41. 040 (main) INFO C.C.C oncurrent. ArrayQueueTest - size = 3 16:24:41. 2019-04-09, 047 [main] INFO C.C.C oncurrent. ArrayQueueTest - 1234 The 2019-04-09 16:24:41. 048. [the main] INFO C.C.C oncurrent. ArrayQueueTest 16:24:41-12345-2019-04-09. 048 [main] INFO c.c.concurrent.ArrayQueueTest - 123456Copy the code

You can also see from the run results that data can only be written to the queue after it has been consumed.


When there is no consumption, writing data to the queue causes the writing thread to block.

Concurrent test

Three threads write 300 pieces of data concurrently, and one thread consumes one.

= = = = = 0. 299Copy the code

The final queue size is 299, and visible threads are safe.

The entire queue is thread-safe because operations in both write and fetch methods require locks to operate.

ArrayBlockingQueue

Let’s take a look at the implementation of the JDK standard ArrayBlockingQueue, which is better understood with the basics above.

Initializing the queue

It seems a little more complicated, but it’s easy to understand when you break it down:

The first step is to initialize a queue-sized array as we wrote it ourselves.

The second step initializes a reentrant lock, which is identical to synchronized.

When reentrant locks are initialized, the default is not fair. You can also specify true to use fair locks. This will write and consume in queue order.

For more information about how to use ReentrantLock and how to implement it, see here

Step 3/4 creates the conditions notEmpty notFull, which is used in a similar way to object.wait/notify.

That’s the whole initialization thing, and it’s actually very similar to what we’ve done ourselves.

Written to the queue

You’ll find that the principle of blocking writes is similar, except that a Lock is used to explicitly acquire and release the Lock.

And notfull.await (); notEmpty.signal(); Object. wait/notify is the same as object.wait/notify.

Of course, it also implements the timeout blocking API.

Also relatively simple, using a timeout wait method.

Consumer queue

Look at the consumption queue:

It’s almost the same. It’s easy to see.

The timeout API also uses notempty.awaitnanos (nanOS) to return the timeout, but I won’t go into details.

The actual case

With that said, let’s look at a practical example of a queue.

Here’s the background:

A scheduled task reads a batch of data from the database at regular intervals, verifies the data and invokes a remote interface.

The simple way is to complete the whole process of reading data, verifying messages and calling interfaces by the thread of this scheduled task. But there’s a problem with that:

If the external interface is called abnormally and the network is unstable, the time will increase, which will reduce the efficiency of the whole task, because they are serial and will affect each other.

So we improved the scheme:

It’s a typical producer-consumer model:

  • The production thread reads the message from the database and throws it into the queue.
  • The consuming thread retrieves data from the queue for business logic.

In this way, the two threads can be decoupled from each other through the queue, and the queue can also act as a buffer.

But there are a few small details worth noting.

Because the external interface supports batch execution, the consuming thread will do an accumulation in memory after fetching data, and will process the accumulated data once the threshold is reached or a time period is accumulated.

However, the blocking API queue.take() was used for consumption due to the developer’s carelessness; It works fine.

However, once the original data source, that is, the DB has no data, resulting in the queue of data is also consumed after the consumption thread will be blocked.

In this way, the data accumulated in memory in the last round will not be used until the data source has data again. Once the interval is long, serious business exceptions may occur.

So it is best to use the queue.poll(timeout) API with a timeout, unless there is a clear business requirement to block.

This habit also applies to other scenarios, such as HTTP calls and RPC interface calls that require a reasonable timeout.

conclusion

This concludes the sharing of ArrayBlockingQueue, and we will continue to update other concurrent containers and concurrent tools.

If you have any questions about this article, please leave a comment.

All source code involved in this article:

Github.com/crossoverJi…

Your likes and shares are the biggest support for me