This is the fourth article in a series called Asynchronous Processing in Android and iOS Development. In this article, we’ll focus on the queue structure commonly used in client programming, its asynchronous programming style, and related interface design issues.
A few days ago, a colleague ran over to discuss a technical problem. Here’s the thing: He’s working on a mobile game that synchronizes data to the server every time a user does something on the client. According to the traditional network request processing mode, after the user initiates an operation, the user needs to wait for the completion of the operation. In this case, the interface displays a request waiting process (such as transferring chrysanthemums). When the request completes, the client display layer is updated and the user can initiate the next action. However, the game requires the user to be able to do a lot of things in a short amount of time. If every operation had to go through a request wait process, the experience would be bad.
In fact, you need a queue of action tasks. Instead of waiting for an operation to complete, the user can simply queue the operation and proceed to the next operation. However, when there is an error in the queue, there needs to be a unified error handling process. Of course, the server also needs to cooperate with some processing, such as operation de-duplication more carefully.
This article will discuss those issues related to the design and implementation of queues.
Note: The code that appears in this series has been collated to GitHub (which is constantly updated), and the repository address is:
- Github.com/tielei/Asyn…
Among them, the Java code in this article, the com. Zhangtielei. Demos. Async. Programming. The queueing in this package.
An overview of the
There are many scenarios for using queues in client programming. Here are a few of them.
- Send a chat message. Most chat programs now allow users to enter multiple messages in a row, which means they don’t have to wait for the first message to be sent before typing a second one. The system ensures that user messages are sent in an orderly manner. If a message fails to be sent due to poor network conditions, the system tries several times to ensure that the message is delivered as best as possible. This actually has a message sending queue behind it, which queues the messages and performs limited retries in the event of an error.
- Upload multiple photos at once. If users can upload multiple photos at a time, the upload process takes a long time and generally requires one or more queues. The queue retry function also allows resumable files from breakpoints (this requires server support, of course).
- Asynchronize critical high-frequency operations to improve the experience. For example, the example of continuous operation of the game mentioned above, or Posting photos or commenting on others in wechat moments, do not need to wait for the end of this network request, and can carry out subsequent operations. There is also a queue mechanism behind this.
For the sake of discussion, we refer to a queue that queues a series of operations with some failure retry capability as a “task queue.”
This article is divided into three sections to discuss the topic of asynchronous tasks and task queues.
- This section describes the traditional thread-safe Queue (TSQ).
- Lockless queues suitable for client programming environments. This section follows the classic Callback approach for asynchronous tasks to design the interface. For a detailed discussion of callbacks for asynchronous tasks, see article 2 of this series.
- Queue based on the idea of RxJava responsive programming. In this section, we’ll see how RxJava can impact interface design for asynchronous tasks.
Thread-Safe Queue
In a multithreaded environment, you can’t mention queues without mentioning TSQ. It is a classic tool that provides an orderly transfer of data between different threads. Its structure is shown below.
Consumers and producers are on different threads so that they are decoupled and production is not blocked by consumption. If you use TSQ for a task queue, then production is equivalent to the user’s actions producing the task, and consumption is equivalent to the task starting and executing.
The consumer thread runs in a loop, constantly trying to fetch data from the queue, and if there is no data, it blocks on the queue head. This blocking operation relies on some primitive of the operating system.
Decoupling with queues is an important idea. Further afield, the idea of TSQ extends between processes to be the equivalent of the Message Queue commonly used in distributed systems. It plays a key role in decoupling heterogeneous services and masking performance differences between different services.
TSQ is relatively rare in client programming for several reasons:
- It requires an additional separate thread to be started as a consumer.
- The “main Thread- > asynchronous Thread- > main Thread” programming pattern (described in the Run Loop section of the first article in this series) is more suitable for the client environment, allowing both producers and consumers to Run on the main Thread, thus removing the need for a threadsafe queue. Instead, all you need is a normal queue (covered in the next chapter).
We mention TSQ here mainly because it is a classic and can be compared to other methods. We won’t show the source code demo here, but you can see GitHub for more details. The demo code on GitHub uses a ready-made implementation of TSQ in the JDK: LinkedBlockingQueue.
Callback based task queue
As shown in the figure above, both producers and consumers run on a single thread, the main thread. To implement a task queue in this way, the tasks we need to execute must themselves be asynchronous, otherwise the entire queue of tasks will not be asynchronous.
We define the interface for the asynchronous task to execute as follows:
public interface Task {
/** * An ID that uniquely identifies the current task *@return* /
String getTaskId(a);
/** * If the task is asynchronous, the start method is called only to start the task; * The TaskListener is called back when the task is complete. * * Note: the start method must be executed on the main thread. */
void start(a);
/** * set callback listener. *@param listener
*/
void setListener(TaskListener listener);
/** * asynchronous task callback interface. */
interface TaskListener {
/** * Callback of current task completion. *@param task
*/
void taskComplete(Task task);
/** * The current task failed to execute the callback@param task
* @paramCause Cause of failure */
void taskFailed(Task task, Throwable cause); }}Copy the code
Since Task is an asynchronous Task, we define a callback interface TaskListener for it.
GetTaskId is designed to get an ID that uniquely identifies the current task so that you can accurately distinguish between tasks.
In addition, we will use a Throwable object to express the cause of the failure more generally. (Note: this is not necessarily a good idea to follow in actual programming.
One might say: The Task interface is defined as asynchronous, so what if you want to execute a synchronous Task? It’s actually pretty easy to do. There are many ways to transform a synchronous task into an asynchronous one (but not the other way around).
The interface of the task queue is defined as follows:
public interface TaskQueue {
/** * adds a task to the queue. *@param task
*/
void addTask(Task task);
/** * sets the listener. *@param listener
*/
void setListener(TaskQueueListener listener);
Note: the last time a queue is used, it should be actively destroyed. */
void destroy(a);
/** * Task queue external listening interface. */
interface TaskQueueListener {
/** * Callback to task completion. *@param task
*/
void taskComplete(Task task);
/** * Callback to the task that finally failed. *@param task
* @paramCause Cause of failure */
void taskFailed(Task task, Throwable cause); }}Copy the code
The actions of the TaskQueue themselves are also asynchronous; addTask simply puts the task in the queue, and the caller needs to listen to the TaskQueueListener interface when it completes (or fails).
Note that TaskQueueListener’s taskFailed, unlike the previous TaskListener’s taskFailed, indicates that the task fails after a certain number of failed attempts. The latter only means that the task failed once.
We will focus on the implementation of TaskQueue, but the implementation of Task is not a concern here, we only care about its interface. The code for TaskQueue is as follows:
public class CallbackBasedTaskQueue implements TaskQueue.Task.TaskListener {
private static final String TAG = "TaskQueue";
/** * Queue for Task. Thread-safe */ is not required
private Queue<Task> taskQueue = new LinkedList<Task>();
private TaskQueueListener listener;
private boolean stopped;
/** * Maximum number of retries for a task */ If the maximum number of retries exceeds MAX_RETRIES, the task fails
private static final int MAX_RETRIES = 3;
/** * The number of times the current task has been executed (it fails when an attempt exceeds MAX_RETRIES) */
private int runCount;
@Override
public void addTask(Task task) {
// A new task is queued
taskQueue.offer(task);
task.setListener(this);
if (taskQueue.size() == 1 && !stopped) {
// This is the first queued task, execute it immediatelylaunchNextTask(); }}@Override
public void setListener(TaskQueueListener listener) {
this.listener = listener;
}
@Override
public void destroy(a) {
stopped = true;
}
private void launchNextTask(a) {
// Get the task in the current queue header, but not out of the queue
Task task = taskQueue.peek();
if (task == null) {
//impossible case
Log.e(TAG, "impossible: NO task in queue, unexpected!");
return;
}
Log.d(TAG, "start task (" + task.getTaskId() + ")");
task.start();
runCount = 1;
}
@Override
public void taskComplete(Task task) {
Log.d(TAG, "task (" + task.getTaskId() + ") complete");
finishTask(task, null);
}
@Override
public void taskFailed(Task task, Throwable error) {
if(runCount < MAX_RETRIES && ! stopped) {// You can keep trying
Log.d(TAG, "task (" + task.getTaskId() + ") failed, try again. runCount: " + runCount);
task.start();
runCount++;
}
else {
// Finally failed
Log.d(TAG, "task (" + task.getTaskId() + ") failed, final failed! runCount: "+ runCount); finishTask(task, error); }}/** * Processing of a task after its final completion (success or failure) *@param task
* @param error
*/
private void finishTask(Task task, Throwable error) {
/ / callback
if(listener ! =null && !stopped) {
try {
if (error == null) {
listener.taskComplete(task);
}
else{ listener.taskFailed(task, error); }}catch (Throwable e) {
Log.e(TAG, "", e);
}
}
task.setListener(null);
/ / out of the queue
taskQueue.poll();
// Start the next task in the queue
if (taskQueue.size() > 0&&! stopped) { launchNextTask(); }}}Copy the code
A few things to note in this implementation are:
- All operations in and out of the queue (
offer
.peek
.take
Both run on the main thread, so queue data structures no longer need to be thread-safe. We chose the implementation of LinkedList. - The start and execution of a task depends on two opportunities:
- Task queue
addTask
If the queue is empty (the current task is the first task), start it. - After a task completes (successfully, or ultimately fails), if there are other queued tasks in the queue, the next task is removed and executed.
- Task queue
- If a task fails once, it does not fail. Multiple retries are required. If the number of retries exceeds
MAX_RETRIES
, is the final failure.runCount
The total number of times the current task is executed.
The code for CallbackBasedTaskQueue reveals the basic implementation pattern for task queues.
The retry strategy of task queue for failed tasks greatly improves the probability of final success. In the GitHub demo, I set the Task’s probability of failure very high (up to 80%), and with three retries there is still a high probability that the Task will succeed when it is executed.
Rxjava-based task queue
What’s the point of RxJava? There’s a lot of discussion online.
Some people say that RxJava is for asynchrony. That’s true, of course, but it’s not specific.
Some say that the real benefit of RxJava is the variety of Lift transforms it provides. Others say that RxJava’s greatest use is its Schedulers mechanism, which makes it easy to switch threads. None of this is revolutionary.
So what’s the key? Personally, I think it has had a fundamental impact on callback interface design: it eliminates the need to define callback interfaces separately for each asynchronous interface.
Here’s an example right away. The TaskQueue interface was rewritten using RxJava.
public interface TaskQueue {
/** * adds a task to the queue. **@param task
* @param<R> Data type to be returned after an asynchronous task is executed. *@returnAn Observable by which the caller gets the result of an asynchronous task. */
<R> Observable<R> addTask(Task<R> task);
Note: the last time a queue is used, it should be actively destroyed. */
void destroy(a);
}Copy the code
Let’s take a closer look at the modified TaskQueue interface definition.
- The original callback interface
TaskQueueListener
No more. - The asynchronous interface
addTask
Instead of returning a value, an Observable is returned. The caller takes the Observable, subscribes to it, and gets the result (success or failure) of the task.The change here is crucial. originallyaddTask
Nothing is returned and a callback interface must be listened on to get results, which is how a typical asynchronous task works. But returning an Observable makes it feel a lot like a synchronization interface. To put it more abstractly, an Observable is a reference to the future when we are standing in the present, an ethereal task in the future that hasn’t been running yet, when we have something real in our hands. And we can do a lot of things with it right now, and combine it with other Observables. This is the real power of this idea.
Accordingly, the Task interface is also an asynchronous interface and can be modified in this way:
** Instead of using TaskListener to pass callbacks, use Observable. **@param<R> Data type to be returned after an asynchronous task is executed. */
public interface Task <R> {
/** * An ID that uniquely identifies the current task *@return* /
String getTaskId(a);
** Note: the start method must be executed on the main thread. **@returnAn Observable by which the caller gets the result of an asynchronous task. */
Observable<R> start(a);
}Copy the code
The interface changed to RxJava is clearly discussed here, but the specific queue implementation is not important. The specific implementation code is not discussed here, but for more details, please refer to GitHub. Note that GitHub’s implementation uses a trick to encapsulate an asynchronous task as an Observable, using AsyncOnSubscribe.
conclusion
Let’s say TSQ again
We covered TSQ at the beginning of this article and pointed out that it is rarely used in client programming. But that’s not to say that TSQ doesn’t make sense in a client environment.
In fact, the client’s Run Loop (Android’s Looper) is itself a TSQ, otherwise it wouldn’t be able to safely pass messages and schedule tasks between threads. Because the client has a Run Loop, it is possible to implement the task queue in a lock-free manner. So our programming on the client side has always been tied to TSQ.
Incidentally, Android.os.looper in Android will eventually rely on the well-known epoll event mechanism in the Linux kernel.
Ignored in the task queue design of this article
The core task of this article is to explain asynchronous programming of task queues, so some design details are omitted. If you are implementing a task queue that can be used in a production environment, you may also want to consider the following points:
- This article only designs the success and failure callbacks of tasks, not the execution progress callbacks.
- This article does not cover task cancellations and pauses (we will cover this topic in our next article).
- Some details of the task queue, such as the maximum number of retries, should be set by the user.
- The interaction between long-life queues and short-life pages is not considered in this article. In the GitHub implementation’s demo code, for simplicity, the task queue is destroyed after the demo page is closed. But that’s not how it should be. The question of “long life cycle interactions”, as I later discovered, is also an important one, which we may have a chance to discuss later.
- In Android, components that may run in the background for a long time, such as task queues, are generally encapsulated by services.
- The handling of retry failures by task queues requires the server to treat the retry problem seriously.
- Error handling becomes complicated when a task queue failure occurs.
Advantages and disadvantages of RxJava
Finally, RxJava is used to rewrite the task queue. We did simplify the interface a lot, eliminating the need for callback interfaces and allowing callers to handle asynchronous tasks in a uniform way.
However, we also need to be aware of some issues with RxJava:
- RxJava is a heavy framework that is very abstract and difficult to understand. It is simple for the caller of the interface and difficult for the implementer of the interface. Sometimes it’s not obvious how to return an appropriate Observable instance when implementing an asynchronous interface.
- Observables rely on Subscribe to drive their upstream operations. That is, if you just add a task and don’t observe it, it won’t execute! If you just want to run a task and don’t care about the result, well, that’s not possible. To take an incongruous example, this is a bit like quantum mechanics, where observations influence results……
- As a result of the previous point, in the GitHub code implementation presented in this article, the first task is not actually up and running
addTask
Instead, it is delayed until the caller’s subscribe starts executing. Moreover, its execution thread environment may be affected by the callers’ Settings of the Schedulers (for example, by subscribeOn), and there is a risk of not executing on the main thread. - RxJava presents a strange and confusing call stack when debugging.
Given these problems with RxJava, if I wanted to implement a fully functional task queue or other complex asynchronous tasks, especially if I wanted to open source it, I probably wouldn’t make it absolutely dependent on RxJava. Rather, it is possible to support its own lightweight asynchronous mechanism and RxJava, as Retrofit does.
Before I close, LET me pose one more interesting open-ended question. The code presented on GitHub makes heavy use of anonymous classes (the equivalent of Java 8 lambda expressions), which can complicate references between objects. Then, it will be an interesting topic to analyze the reference relationship of these objects. For example, how are these reference relationships initially established as the program executes and then finally broken when it is destroyed? Is there a memory leak? Welcome to comment.
In the next article, we’ll look at a more complex problem with asynchronous tasks: cancellation of asynchronous tasks.
(after)
Other selected articles:
- Those antipatterns of programmers
- Authentic technology with wild way
- Increasing entropy in the programming world
- The programmer’s cosmic timeline
- How annoying is Android push?
- Asynchronous processing in Android and iOS development (PART 1) — Introduction
- Manage App numbers and red dot tips with a tree model
- A diagram to read thread control in RxJava
- The descriptor of the End of the Universe (2)
- Redis internal data structure details (5) – QuickList
- Redis internal data structure (4) – Ziplist