The introduction

I’ve always been interested in caching. I looked at TMCache, but TMCache was deadlocked and TMCache was not maintained, but the PIN team rewrote PINCache, a caching framework based on TMCache. Before looking at PINCache, I recommend a very good article: TMCache source code analysis, you can first read this article before reading the source code of PINCahce.

Today’s focus is on PINOperationQueue, the core of PINCache. We all know that NSOperationQueue is implemented based on GCD, but if you were to implement a set of NSOperationQueue based on GCD, what would you do? The PINOperationQueue provides a reference.

To better understand PINOperationQueue, I’ve copied the Swift version of PINOperationQueue, named BEOperationQueue. GitHub addresses of PINCahce and BECache. This article is based on the BEOperationQueue code for analysis.

1.BEOperatiopnQueue(PINOperationQueue)

1. Core attributes

1.1 Array to store Operation

  • queueOperations
  • lowPriorityOperations
  • defaultPriorityOperations
  • highPriorityOperations

Including low priority, default priority, high priority series group and total group

1.2 Queues and semaphores

  • SerialQueue: DispatchQueue(label: “BEOperation Serial Queue”)
  • SemaphoreQueue: DispatchQueue(label: BEOperation Semaphore Queue)
  • ConcurrentQueue: DispatchQueue(label: “BEOperation Concurrent Queue”, Attributes:.concurrent)
  • concurrentSemaphore: DispatchSemaphore

Two serial queues and one concurrent queue and one semaphore

2. Core methods

1.1 func scheduleOperation

BEOperatiopnQueue is initialized and this method is used to schedule and execute tasks, so it starts with scheduleOperation and then executes

func scheduleOperation(with workItem: @escaping OperationItem.priority: BEOperationQueuePriority = .default) -> BEOperationReference{
    let operation = BEOperation.operation(with: priority, reference: nextOperationReference(), workitem: workItem)
    lockOperation { locked_addOperation(with: operation) }
    scheduleNextOperation(with: false)
    return operation.reference!
}
Copy the code
  1. First wrap the incoming task as a BEOperation object
  2. Then add it to the low, default, and high priority series groups according to the priority
  3. Start execution, that is, callscheduleNextOperationThis method

ScheduleNextOperation is the core method of the entire OPeratiuonQueue and also the center of task execution. Swift version has a short code of over 30 lines, but it is very efficient. Because this method is not easy to understand, I also repeatedly debugging and thinking, slowly understand, record their own understanding process, if there is wrong, welcome to point out.

1.2 func scheduleNextOperation

ScheduleNextOperation is the core method of the OPeratiuonQueue, which is responsible for scheduling and executing tasks. It is mainly divided into upper and lower parts and is interrupted by if onlyCheckSerial {return} code. The top half has a recursive call to drive it.

private func scheduleNextOperation(with onlyCheckSerial: Bool) {
    lock()
    if serialQueueBusy = = false {
        if let operation = locked_nextOperationByQueue() {
            serialQueueBusy = true
            serialQueue.async {
                operation.workItems.forEach { $0()}self.group.leave()
                self.lockOperation { self.serialQueueBusy = false }
                self.scheduleNextOperation(with: true) / / recursion
            }
        }
    }
    unlock()
    
    if onlyCheckSerial { return }
    if maxConcurrentOperations < 2 { return }
    semaphoreQueue.async {
        self.concurrentSemaphore?.wait()
        self.lock()
        let op = self.locked_nextOperationByPriority()
        self.unlock()
        if let operation = op {
            self.concurrentQueue.async {
                operation.workItems.forEach { $0()}self.group.leave()
                self.concurrentSemaphore?.signal()
            }
        } else {
            self.concurrentSemaphore?.signal()
        }
    }
}
Copy the code

2. ScheduleNextOperation (with onlyCheckSerial: Bool) Method analysis

To get a more intuitive understanding of the scheduleNextOperation execution flow, I added nine tasks to the test code. After the test() method is called, the scheduleNextOperation(with onlyCheckSerial: Bool) method is executed. The task execution process will be explained in detail below.

func test(a) {
    let opQueue =  BEOperationQueue.init(maxConcurrentOperations: 2)
    opQueue.scheduleOperation(with: { sleep(10); print("BE-1-default") }, priority: .default)
    opQueue.scheduleOperation(with: { sleep(5); print("BE-2-high") }, priority: .high)
    opQueue.scheduleOperation(with: { sleep(5); print("BE-3-low") }, priority: .low)
    opQueue.scheduleOperation(with: { sleep(5); print("BE-4-low") }, priority: .low)
    opQueue.scheduleOperation(with: { sleep(5); print("BE-5-low") }, priority: .low)
    opQueue.scheduleOperation(with: { sleep(5); print("BE-6-low") }, priority: .low)
    opQueue.scheduleOperation(with: { sleep(5); print("BE-7-low") }, priority: .low)
    opQueue.scheduleOperation(with: { sleep(5); print("BE-8-high") }, priority: .high)
    opQueue.scheduleOperation(with: { sleep(5); print("BE-9-high") }, priority: .high)
  }
Copy the code

Task execution process:

  1. When task 1 comes in, it goes to this code
lock()
    if serialQueueBusy = = false {
        if let operation = locked_nextOperationByQueue() {
            serialQueueBusy = true
            serialQueue.async {
                operation.workItems.forEach { $0()}self.group.leave()
                self.lockOperation { self.serialQueueBusy = false }
                self.scheduleNextOperation(with: true) / / recursion
            }
        }
    }
 unlock()
Copy the code

When entered, serialQueueBusy is set to true, meaning that the serial queue is busy. Then the serialQueue asynchronously executes the task, and when it’s done, set serialQueueBusy to false, meaning that the serialQueue is not busy, and then recursively calls itself, but with true, so it doesn’t get into this code. Then recursively pull out the tasks, one at a time and continue until there are no more tasks.

    if onlyCheckSerial { return } // Return true
    if maxConcurrentOperations < 2 { return }
    semaphoreQueue.async {
        self.concurrentSemaphore?.wait()
        self.lock()
        let op = self.locked_nextOperationByPriority()
        self.unlock()
        if let operation = op {
            self.concurrentQueue.async {
                operation.workItems.forEach { $0()}self.group.leave()
                self.concurrentSemaphore?.signal()
            }
        } else {
            self.concurrentSemaphore?.signal()
        }
    }
Copy the code
  1. Classic came, is the above section of the code

When we send nine tasks at a time, let’s say we set the maximum concurrency to 2. So at the beginning there’s a task that’s going to execute step 1 in the serialQueue, and there’s a task that’s going to execute this code down here

    semaphoreQueue.async {
        self.concurrentSemaphore?.wait()
        self.lock()
        let op = self.locked_nextOperationByPriority()
        self.unlock()
        if let operation = op {
            self.concurrentQueue.async {
                operation.workItems.forEach { $0()}self.group.leave()
                self.concurrentSemaphore?.signal()
            }
        } else {
            self.concurrentSemaphore?.signal()
        }
    }
Copy the code

The semaphoreQueue serial queue is entered first. After each task is entered, self.concurrentSemaphore? .wait(), the semaphore is reduced by 1, and when two tasks are entered, the other tasks are stuck, waiting for the semaphore to be greater than 0. The task will then be placed in concurrentQueue and executed in a concurrentQueue, and then self.concurrentsemaphore? .signal(), the semaphore is incremented by 1, and a task enters the semaphoreQueue serial queue… Continue the above section. Meanwhile, the recursive code for the first step is still executing, still fetching the task and then executing it in the serialQueue.

At this point, the scheduleNextOperation(with onlyCheckSerial: Bool) analysis is complete, and it should be clear how the execution process of the task is driven.

3. Think about

Now, let’s think about the question, in this code, why did the author design this way? Why embed a concurrent queue in a serial queue to perform a task?

    semaphoreQueue.async {
        self.concurrentSemaphore?.wait()
        self.lock()
        let op = self.locked_nextOperationByPriority()
        self.unlock()
        if let operation = op {
            self.concurrentQueue.async {
                operation.workItems.forEach { $0()}self.group.leave()
                self.concurrentSemaphore?.signal()
            }
        } else {
            self.concurrentSemaphore?.signal()
        }
    }
Copy the code

Before I think about that, let me tell you what I learned from this approach:

  1. Drive central approach, flexible use of recursion
  2. The combination of semaphores and concurrent queues is used to control the maximum concurrency of a task
  3. The use of a serial queue in which a concurrent queue is nested to perform a task

On point 3, why do you embed a concurrent queue in a serial queue to perform a task? Through debugging, my experimental results are as follows:

Suppose we remove semaphorequeue. async from the above code, that is, execute tasks directly in a concurrent queue. We call the test() code above and find that the main thread is stuck because each task will sleep(5).

In this case, we execute test() on the child thread (assuming a global concurrency queue), and the main thread will definitely not be stuck. But tasks are not executed in the order we set them. Because tasks are added asynchronously, tasks stored in the queueOPerations array and low, medium, and high priority series groups are also uncertain, unlike in serial queues where they are added in sequence. [Because tasks can be added quickly but executed slowly]