GCD’s dispatch_queue is linked in more detail
Updated in 2020-12-13 updated source code analysis for asynchronous execution tasks
GCD queue is the technical point we often contact in the use of GCD, analysis of dispatch_queue part of the source can better understand the processing of multiple threads. However, the source code for libdispatch is relatively complex, and libdispatch-187.9 was used for analysis.
1. The relationship between queues and threads
Concurrent Programming: APIs and ChallengesThe relationship between threads and GCD is visually illustrated by one of the following images:
Threads and queues do not have a one-to-one relationship. A thread may have multiple queues, which may be serial or parallel and work synchronously or asynchronously.
For the main thread and the main queue, the main queue is a serial queue on the main thread that is automatically created for us by the system. In other words, the main thread can perform tasks on other queues besides the main queue. We can test this with the following code:
// Test the code
override func viewDidLoad(a) {
super.viewDidLoad()
let sQueue = DispatchQueue(label: "sQueue")
// Serial queue synchronization does not create new threads. Tasks are executed under the current Thread, so thread.current must be the main Thread
sQueue.sync { print(Thread.current) }
}
Copy the code
2. Definition of queues
2.1 dispatch_queue_s
Dispatch_queue_s is the queue structure, arguably the most exposed structure in GCD. There are a lot of macros in GCD that don’t help us understand the code, so we replace the defined macros with the corresponding structure.
In order to facilitate the subsequent analysis, some functions are listed for easy understanding
struct dispatch_queue_s {
DISPATCH_STRUCT_HEADER(dispatch_queue_s, dispatch_queue_vtable_s)
const struct dispatch_queue_vtable_s *do_vtable; // This type of structure contains operations on dispatch_queue_s
struct dispatch_queue_s *volatile do_next; // List next
unsigned int do_ref_cnt; // Reference count
unsigned int do_xref_cnt; // External reference count
unsigned int do_suspend_cnt; // Pause flag, such as in delay processing, when the task arrives, the timer processing will change this flag bit, and then wake up the queue scheduling
struct dispatch_queue_s *do_targetq; // Target queue, GCD allows us to put one queue in another queue to perform tasks
void *do_ctxt; // context, used to store thread pool-related data, such as semaphores used to suspend and wake up threads, thread pool dimensions, etc
void *do_finalizer;
// Part 2: DISPATCH_QUEUE_HEADER
uint32_t volatile dq_running; // The number of tasks running in the queue
uint32_t dq_width; // Maximum number of concurrent requests: The maximum number of concurrent requests in the main queue/serial queue is 1
struct dispatch_object_s *volatile dq_items_tail; // Queue end
struct dispatch_object_s *volatile dq_items_head; // Queue header
unsigned long dq_serialnum; // Queue sequence number
dispatch_queue_t dq_specific_q; / / specific queues
char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; The queue name must be less than 64 characters long
char _dq_pad[DISPATCH_QUEUE_CACHELINE_PAD]; // for static queues only
};
Copy the code
2.2 dispatch_queue_vtable_s
The dispatch_queue_vtable_s structure contains the dispatch_object_s action functions within the GCD queue, and there are relatively short macros for these action functions that are easy to call.
// dispatch_queue_vtable_s structure, which declares functions to operate on the dispatch_queue_s structure
struct dispatch_queue_vtable_s {
// DISPATCH_VTABLE_HEADER(dispatch_queue_s);
unsigned long const do_type;
const char *const do_kind;
size_t (*const do_debug)(struct dispatch_queue_s *, char *, size_t);
// The method used to wake up the queue, global queue and main queue are NULL
struct dispatch_queue_s* (*const do_invoke) (struct dispatch_queue_s);
// Used to check whether some values in the passed object meet the criteria
bool (*const do_probe)(struct dispatch_queue_s *);
// The finalizer function for this object is usually called internally
void (*const do_dispose)(struct dispatch_queue_s *)
};
Copy the code
Define three static constants of dispatch_queue_vtable_s in queue.c, respectively
// For main queues and custom queues
const struct dispatch_queue_vtable_s _dispatch_queue_vtable = {
.do_type = DISPATCH_QUEUE_TYPE,
.do_kind = "queue",
.do_dispose = _dispatch_queue_dispose,
.do_invoke = NULL,
.do_probe = (void *)dummy_function_r0,
.do_debug = dispatch_queue_debug,
};
// For global queues
static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
.do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
.do_kind = "global-queue",
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_queue_wakeup_global,
};
// Used to manage queues
static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
.do_type = DISPATCH_QUEUE_MGR_TYPE,
.do_kind = "mgr-queue",
.do_invoke = _dispatch_mgr_thread,
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_mgr_wakeup,
};
Copy the code
3. Queue type
Queues can be classified into four types: primary queue, management queue, custom queue, and global queue.
The home team in 3.1
Use dispatch_get_main_queue() to obtain the main queue, which is defined as follows:
#define dispatch_get_main_queue() (&_dispatch_main_q)
struct dispatch_queue_s _dispatch_main_q = {
#if! DISPATCH_USE_RESOLVERS
.do_vtable = &_dispatch_queue_vtable,
.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY], // One of the root queues
#endif
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.dq_label = "com.apple.main-thread",
.dq_running = 1,
.dq_width = 1.The main queue is a serial queue
.dq_serialnum = 1.// Main queue sequence number
};
Copy the code
do_vtable
The do_vtable of the main queue is _dispatch_queue_vtable.
do_targetq
Do_targetq is the target queue, about the meaning of the target queue, in the analysis of the global queue do_targetq will give a more specific summary.
The target queue definition for the main queue:
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.default-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 7,}Copy the code
Do_ref_cnt, do_xref_cnt
Do_ref_cnt and DO_xref_cnt are reference counts related to memory management of GCD objects. The two values for the main queue are DISPATCH_OBJECT_GLOBAL_REFCNT.
void dispatch_retain(dispatch_object_t dou) {
if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return; // global object
}
// ...
}
void _dispatch_retain(dispatch_object_t dou) {
if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return; // global object
}
// ...
}
void dispatch_release(dispatch_object_t dou) {
if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return;
}
// call _dispatch_release;
// ...
}
void _dispatch_release(dispatch_object_t dou) {
if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return; // global object
}
// Call do_dispose macro dx_Dispose
// ...
}
Copy the code
It can be seen from the above functions:
- The life cycle of the main queue is with the application and is not affected by retain and release.
- when
do_ref_cnt
,do_xref_cnt
When both values are 0, the object is released.
3.2 Managing Queues
The management queue is the internal queue of the GCD and is not open to the public. This queue is supposed to play the role of the management. The GCD timer uses the management queue.
struct dispatch_queue_s _dispatch_mgr_q = { .do_vtable = &_dispatch_queue_mgr_vtable, .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, .do_targetq = &_dispatch_root_queues[ DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY], .dq_label = "com.apple. libdispatcher-manager ",.dq_width = 1,.dq_serialnum = 2, // Manage queue serial number};Copy the code
do_vtable
The do_vtable of the management queue is _dispatch_queue_mgr_vtable.
do_targetq
The target queue of the management queue:
[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.high-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 9,}Copy the code
Do_ref_cnt, do_xref_cnt
The two values for the management queue are DISPATCH_OBJECT_GLOBAL_REFCNT, so the life cycle of the main queue should be the same.
3.3 Customizing queues
Create a custom queue using dispatch_queue_CREATE (const char *label, dispatch_queue_attr_t attr). Its source code is as follows:
dispatch_queue_t dispatch_queue_create(const char *label, dispatch_queue_attr_t attr) {
dispatch_queue_t dq;
size_t label_len;
if(! label) { label ="";
}
label_len = strlen(label);
if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
}
// XXX switch to malloc()
dq = calloc(1ul.sizeof(struct dispatch_queue_s) -
DISPATCH_QUEUE_MIN_LABEL_SIZE - DISPATCH_QUEUE_CACHELINE_PAD +
label_len + 1);
if(slowpath(! dq)) {return dq;
}
// _dispatch_queue_init(dq);
// Queue initialization expands as follows
dq->do_vtable = &_dispatch_queue_vtable;
dq->do_next = DISPATCH_OBJECT_LISTLESS;
dq->do_ref_cnt = 1;
dq->do_xref_cnt = 1;
// Default target queue is overcommit!
_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY]
dq->do_targetq = _dispatch_get_root_queue(0.true);
dq->dq_running = 0;
dq->dq_width = 1;
dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;
strcpy(dq->dq_label, label);
if(fastpath(! attr)) {return dq;
}
// If it is a concurrent queue, set the maximum concurrency, UINT32_MAX can be regarded as unrestricted maximum concurrency
if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) {
dq->dq_width = UINT32_MAX;
// Set overcommit to false for concurrent queue _dispatch_get_root_queue _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY]
dq->do_targetq = _dispatch_get_root_queue(0.false);
} else{ dispatch_debug_assert(! attr,"Invalid attribute");
}
return dq;
}
Copy the code
Slowpath (x), fastpath (x)
#define fastpath(x) ((typeof(x))__builtin_expect((long)(x), ~0l)) // ~0l is 1
#define slowpath(x) ((typeof(x))__builtin_expect((long)(x), 0l))
Copy the code
Fastpath (x) indicates that the value of x has a maximum probability of 1, which means that it occurs most of the time. Slowpath (x) indicates that the value of x has a maximum probability of 0, that is, it will not occur in most cases. __builtin_expect helps programmers deal with branch prediction and optimization programs. The semantics of this function are: I expect the value of the expression to be equal to the constant C, and the compiler should optimize based on the expected value I provide.
do_vtable
As with the main queue, the do_vtable of the custom queue is _dispatch_queue_vtable.
do_targetq
There are two types of target queues for custom queues:
- If a serial queue is used
_dispatch_get_root_queue(0, true)
The function gets the destination queue, and the destination queue gets is_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY]
. - If it is a concurrent queue, use
_dispatch_get_root_queue(0, false)
The function gets the destination queue, and the destination queue gets is_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY]
.
The overcommit parameter of _dispatch_get_root_queue(long priority, bool overCOMMIT) indicates that a new thread will be opened when a block is executed, no matter how busy the system is. This does not overload any thread.
dq_serialnum
Dq_serialnum is an atomic operation based on _dispatch_queue_serial_numbers that increments 1 from 12. The reserved serial numbers 1 through 11 are defined as follows:
Shared memory - Shared memory - shared memory - Shared memory - shared memory - shared memory - shared memory so the initial value == next assignedCopy the code
1 is used for the primary queue, 2 is used for the management queue, 3 is not used for the moment, and 4 to 11 are used for the global queue. Because the source code version is relatively old, apple has added several queues later.
3.4 Global Queue
We’ve talked a lot about global queues. Now let’s look at how global queues are defined.
dispatch_queue_t dispatch_get_global_queue(long priority, unsigned long flags) {
if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
return NULL;
}
return _dispatch_get_root_queue(priority, flags & DISPATCH_QUEUE_OVERCOMMIT);
}
static inline dispatch_queue_t _dispatch_get_root_queue(long priority, bool overcommit) {
if (overcommit) switch (priority) {
case DISPATCH_QUEUE_PRIORITY_LOW:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_DEFAULT:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_HIGH:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY];
}
switch (priority) {
case DISPATCH_QUEUE_PRIORITY_LOW:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_DEFAULT:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_HIGH:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY];
default:
return NULL; }}struct dispatch_queue_s _dispatch_root_queues[] = {
[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
.dq_label = "com.apple.root.low-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 4,
},
[DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.low-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 5,
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
.dq_label = "com.apple.root.default-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 6,
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.default-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 7,
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
.dq_label = "com.apple.root.high-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 8,
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.high-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 9,
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
.dq_label = "com.apple.root.background-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 10,
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.background-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 11,}};Copy the code
do_vtable
The do_vtable of the global queue is _dispatch_queue_root_vtable. The do_probe function for _dispatch_queue_root_vtable is _dispatch_queue_wakeup_global. This function is used to wakeup global queues.
do_targetq
The main queue, administrative queue, and custom queue all use the global queue (retrieved from the root queue) as the target queue, but the global queue does not have the DO_TARGEtQ set.
Concurrent Programming: APIs and Challenges
While custom queues are a powerful abstraction, All blocks you schedule on them will succeed trickle down to one of the system’s global queues and its thread pool(s).
While custom queues are a powerful abstraction, all blocks you place on queues will eventually fall into one of the system’s global queues and their thread pools. That is to say, all queues used by GCD, whether they are custom queues, or get the system’s main queue, global queue, or administrative queue, will eventually reside in the GCD root queue. The GCD manages only these root queues.
Do_ref_cnt, do_xref_cnt
The two values for the management queue are DISPATCH_OBJECT_GLOBAL_REFCNT, so the life cycle of the main queue should be the same.
do_ctxt
There is a context property in the global queue that stores thread pool-related data, such as semaphore for thread suspension and awakening, thread pool size, and so on. It is defined as follows:
static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif}};Copy the code
4. Queue synchronization: dispatch_sync analysis
The test code
// Serial queue
let sQueue = DispatchQueue(label: "sQueue")
print(1)
sQueue.sync { print("\ [2):\(Thread.current)")}print(3)
sQueue.sync { print("\ [4):\(Thread.current)")}print(5)
// Parallel queue
let cQueue = DispatchQueue(label: "cQueue", attributes: [.concurrent])
print(1)
cQueue.sync { print("\ [2):\(Thread.current)")}print(3)
cQueue.sync { print("\ [4):\(Thread.current)")}print(5)
Copy the code
The execution result
1 2: <NSThread: 0x600002478980>{number = 1, name = main} 3 4: <NSThread: 0x600002478980>{number = 1, name = main} 5 1 2: <NSThread: 0x600002478980>{number = 1, name = main} 3 4: <NSThread: 0x600002478980>{number = 1, name = main} 5Copy the code
The primary and global queue tests are omitted, but the results are the same. Queue synchronous task execution process, no new thread is opened, all tasks are executed in the current thread, and will block the thread.
4.1 Entry function: dispatch_sync
Dispatch_sync source code is as follows:
void dispatch_sync(dispatch_queue_t dq, void (^work)(void)) {
// DISPATCH_COCOA_COMPAT is DISPATCH_COCOA_COMPAT for Mac OS
#if DISPATCH_COCOA_COMPAT
// Is the main queue
if (slowpath(dq == &_dispatch_main_q)) {
// The dispatch_sync_f function is executed internally
return _dispatch_sync_slow(dq, work);
}
#endif
struct Block_basic *bb = (void *)work;
dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
}
#endif
Copy the code
The _dispatch_sync_slow function also performs the dispatch_sync_F function, so the essence of dispatch_sync is the dispatch_sync_F function.
void dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
// Serial queues include primary queues
if (fastpath(dq->dq_width == 1)) {
return dispatch_barrier_sync_f(dq, ctxt, func);
}
Global queues do not have do_targetq. They have primary queues, administrative queues, and custom queues
if(slowpath(! dq->do_targetq)) {// the global root queues do not need strict ordering
(void)dispatch_atomic_add2o(dq, dq_running, 2);
return _dispatch_sync_f_invoke(dq, ctxt, func);
}
// Other queues
_dispatch_sync_f2(dq, ctxt, func);
}
Copy the code
There are three cases of synchronous task execution:
- Execute if it is a serial queue
dispatch_barrier_sync_f
Fence synchronization function; - If it is a global queue, run
_dispatch_sync_f_invoke
; - If it is another queue, run
_dispatch_sync_f2
.
4.2 Key function: dispatch_barrier_sync_f
Before we look at the dispatch_barrier_sync_f function, let’s look at the dispatch_barrier_sync function, which is the synchronization fence function. Its implementation is as follows:
void dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void)) {
#if DISPATCH_COCOA_COMPAT
if (slowpath(dq == &_dispatch_main_q)) {
// Call dispatch_barrier_sync_f internally
return _dispatch_barrier_sync_slow(dq, work);
}
#endif
struct Block_basic *bb = (void *)work;
dispatch_barrier_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
}
Copy the code
It also calls the dispatch_barrier_sync_f function underneath. If a serial queue pushes in the synchronous task, the current task must wait for the previous task to complete, and the source code calls dispatch_barrier_sync_f to do so.
DISPATCH_NOINLINE
void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
// 1) ensure that this thread hasn't enqueued anything ahead of this call
// 2) the queue is not suspended
// 1) Make sure this thread is not enqueued before this call
// 2) The queue is not suspended
// Step 1: If there are other tasks in the serial queue or the queue is suspended, enter _dispatch_barrier_SYNc_f_slow,
// Wait for other tasks in the queue to complete (notified by semaphore), then execute the task.
// In most cases this will not happen
if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
}
// Step 2: check the dQ_RUNNING status of the queue. If it is not running, go to _dispatch_barrier_SYNC_F_slow and wait for activation.
// bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval, ...)
// Compare the value of * PTR with oldval, and if they are equal, update newval to * PTR and return true
// dispatch_atomic_cmpxchg2O (dq, dq_RUNNING, 0, 1) = dq-> dq_RUNNING = 0
// dq-> dq_RUNNING set to 1 and return true
if(slowpath(! dispatch_atomic_cmpxchg2o(dq, dq_running,0.1))) {
// global queues and main queue bound to main thread always falls into
// the slow case
// The global queue and the main queue bound to the main thread are always slow and the _dispatch_barrier_sync_f_slow function is entered
return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
}
// Step 3: there are multiple queues, find the real destination queue, again back to the dispatch_sync_f method
if (slowpath(dq->do_targetq->do_targetq)) {
return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
}
// Step 4: The queue has no task to execute. Call _dispatch_barrier_SYNc_f_invoke to execute the task.
// call _dispatch_function_invoke internally to execute a task
_dispatch_barrier_sync_f_invoke(dq, ctxt, func);
}
Copy the code
There are three functions involved:
_dispatch_barrier_sync_f_slow
The function uses the thread’s corresponding semaphore internally and is calledwait
methods_dispatch_barrier_sync_f_recurse
The function is called internallydispatch_sync_f
Function, again looking for the actual destination queue- Call if there are no tasks in the queue
_dispatch_barrier_sync_f_invoke
Perform tasks. Called when executing a task_dispatch_function_invoke
Function.
4.3 _dispatch_barrier_sync_f_invoke
DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
// Implementation of _dispatch_function_invoke
// Set dispatch_queue_key to DQ for the current thread and execute the task.
// Return to old_dq
dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
_dispatch_thread_setspecific(dispatch_queue_key, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_workitem_inc();
_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
// If there are other tasks in the queue, wake up with the semaphore method and proceed to the next task
if (slowpath(dq->dq_items_tail)) {
return _dispatch_barrier_sync_f2(dq);
}
// dispatch_atomic_dec2O calls the GCC built-in function __sync_sub_and_fetch to perform atomic subtraction. So this line means to decrease the value of dq_RUNNING by 1 and determine if it is equal to 0.
// _dispatch_wakeup is the wake queue function
if (slowpath(dispatch_atomic_dec2o(dq, dq_running) == 0)) { _dispatch_wakeup(dq); }}Copy the code
4.4 the GCD deadlocks
After reviewing the code comments above, let’s think about how deadlocks occur. Take a look at the sample code:
#import "DeadLock.h"
@implementation DeadLock
- (instancetype)init {
if (self = [super init]) {
// [self _mianQueueDeadLock];
[self _serialQueueDeadLock];
}
return self;
}
#pragma mark - Private
- (void)_mianQueueDeadLock {
dispatch_sync(dispatch_get_main_queue(), ^(void) {NSLog("This place is deadlocked.");
});
}
- (void)_serialQueueDeadLock {
dispatch_queue_t queue1 = dispatch_queue_create("1serialQueue", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t queue2 = dispatch_queue_create("2serialQueue", DISPATCH_QUEUE_SERIAL);
dispatch_sync(queue1, ^{
NSLog(@ "11111");
dispatch_sync(queue1, ^{
// If queue2 is used, no deadlock occurs, and if queue1 is used, deadlock occurs
NSLog(@ "22222");
});
});
}
@end
Copy the code
Take _serialQueueDeadLock as an example: When the first serial queue task is executed, skip to step 4 and execute the task directly. On the second dispatch_sync task, push the semaphore into the synchronous queue by executing step 1 (queue is running) and wait for the semaphore to enter the deadlock. Take _mianQueueDeadLock as an example: the main queue column jumps to step 2 and enters a deadlock.
4.5 _dispatch_sync_f_invoke
If the current queue is a global queue, the _dispatch_SYNc_F_invoke function is called.
static void
_dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
// Execute the task
_dispatch_function_invoke(dq, ctxt, func);
// dq->dq_running ->dq_running
if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) = =0)) { _dispatch_wakeup(dq); }}Copy the code
This function executes an incoming task via _dispatch_function_invoke, then checks if the task queue is active based on DQ_RUNNING, and executes the activation function if not. The activation function _dispatch_wakeup(DQ) is placed in queue asynchrony.
4.6 Key function: _dispatch_sync_f2
If it’s another queue, _dispatch_sync_f2. This other queue can be thought of as a custom parallel queue.
_dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
// 1) ensure that this thread hasn't enqueued anything ahead of this call
// 2) the queue is not suspended
// Step 1: There are other tasks in the queue or the queue is suspended, push into the semaphore and wait
if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
return _dispatch_sync_f_slow(dq, ctxt, func);
}
// Step 2: the queue is not activated. After activating the queue, the task is executed. Finally, _dispatch_sync_F_slow is called, but _dispatch_wakeup is added
if (slowpath(dispatch_atomic_add2o(dq, dq_running, 2) & 1)) {
return _dispatch_sync_f_slow2(dq, ctxt, func);
}
// Step 3: Queue has multiple queues, find the real destination queue
if (slowpath(dq->do_targetq->do_targetq)) {
return _dispatch_sync_f_recurse(dq, ctxt, func);
}
// Step 4: The queue has no task to execute. Call _dispatch_SYNc_F_invoke to execute the task.
// call _dispatch_function_invoke internally to execute a task
_dispatch_sync_f_invoke(dq, ctxt, func);
}
Copy the code
There are three functions involved:
_dispatch_sync_f_slow
The function uses the thread’s semaphore internally and calls the wait method._dispatch_sync_f_recurse
The function is called internallydispatch_sync_f
Function, again looking for the actual destination queue.- Call if there are no tasks in the queue
_dispatch_sync_f_invoke
Perform tasks. Called when executing a task_dispatch_function_invoke
Function.
With the above code, the queue’s synchronous execution is sequential. This sequential execution is independent of whether the operation queue is serial or concurrent. These operations are queued in FIFO mode, and each operation is waited until the previous operation completes 8, resulting in this sequential execution.
Now let’s rearrange the flow of queue synchronization execution as shown below:
5. Queue asynchracy: dispatch_async analysis
The test code
Serial queue test
let sQueue = DispatchQueue(label: "sQueue")
print(1)
sQueue.async { print("\ [2):\(Thread.current)")}print(3)
sQueue.async { print("\ [4):\(Thread.current)")}print(5)
Copy the code
The execution result
1 3 5 2: <NSThread: 0x600000B884C0 >{number = 4, name = (null)} 4: <NSThread: 0x600000B884C0 >{number = 4, name = (null)}Copy the code
Concurrent queue testing
let cQueue = DispatchQueue(label: "cQueue", attributes: [.concurrent])
print(1)
cQueue.async { print("\ [2):\(Thread.current)")}print(3)
cQueue.async { print("\ [4):\(Thread.current)")}print(5)
Copy the code
The execution result
1 3 5 4: <NSThread: 0x600002bc69C0 >{number = 6, name = (null)} 2: <NSThread: 0x600002BC84C0 >{number = 5, name = (null)}Copy the code
From the above test code we can know:
- A queue has the ability to initiate new threads during asynchronous task execution.
- A serial queue, which is not the main queue, starts a new thread, does not block the current thread, and all tasks are executed in order.
- A concurrent queue opens up multiple threads, and the number of threads is determined. All tasks are executed out of order.
5.1 Entry function: dispatch_async
Dispatch_async dispatch_async
void dispatch_async(dispatch_queue_t dq, void (^work)(void)) {
dispatch_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
}
Copy the code
Dispatch_async copies blocks from the stack to the heap, or increases the reference count to ensure that blocks are not destroyed before execution. _dispatch_call_block_AND_RELEASE destroys blocks. Then call dispatch_async_f.
Dispatch_async_f
void
dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
dispatch_continuation_t dc;
// No fastpath/slowpath hint because we simply don't know
// Dispatch_barrier_async_f (dispatch_barrier_async_f
if (dq->dq_width == 1) {
return dispatch_barrier_async_f(dq, ctxt, func);
}
// Get a dispatch_continuation_t structure from thread private data
dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if(! dc) {return _dispatch_async_f_slow(dq, ctxt, func);
}
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
dc->dc_func = func;
dc->dc_ctxt = ctxt;
// No fastpath/slowpath hint because we simply don't know
// There is a destination queue, call _dispatch_async_f2 for forwarding.
if (dq->do_targetq) {
return _dispatch_async_f2(dq, dc);
}
// The global queue directly joins the queue
_dispatch_queue_push(dq, dc);
}
Copy the code
From the above source code we can see that dispatch_async_f can be roughly divided into three cases:
- If it is a serial queue, call
dispatch_barrier_async_f
Functions; - Other queues and target queues are called
_dispatch_async_f2
Functions; - If it is a global queue, call it directly
_dispatch_queue_push
The enqueue function performs the enqueue operation.
The process of dispatch_async_f (dispatch_async_f) is described as follows:
Although there are three cases above, they all end with the _dispatch_queue_push or _dispatch_async_f2 functions. The _dispatch_async_f2 function also performs queue enlistment. So the essence of dispatch_async_f is to execute the _dispatch_queue_push function to queue tasks.
5.2 Dispatch_continuation_t structure
When looking at the source code for the above procedure, we’ll refer to structures like dispatch_continuation_t, which encapsulates the tasks of the asynchronous block we pass in. Take the implementation of dispatch_barrier_async_f as an example:
void dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
dispatch_continuation_t dc;
// Get a dispatch_continuation_t structure from thread private data.
dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if(! dc) {// _dispatch_barriER_asynC_F_slow The dispatch_barrier_async_f_SLOW is also in the queue
return _dispatch_barrier_async_f_slow(dq, ctxt, func);
}
// DISPATCH_OBJ_BARRIER_BIT, used for blocking flags
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
// use _dispatch_call_block_and_release as func
dc->dc_func = func;
// Use the passed block as the context
dc->dc_ctxt = ctxt;
// join the queue operation
_dispatch_queue_push(dq, dc);
}
Copy the code
Note the assignment of do_vtable at dispatch_continuation_t.
// Serial queue asynchronous or dispatch_barrier_async will have a barrier flag of DISPATCH_OBJ_BARRIER_BIT
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
// not barrier
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
Copy the code
There are four total identifiers at LibDispatch:
#define DISPATCH_OBJ_ASYNC_BIT 0x1 / / asynchronous
#define DISPATCH_OBJ_BARRIER_BIT 0x2 / / blocking
#define DISPATCH_OBJ_GROUP_BIT 0x4 / / group
#define DISPATCH_OBJ_SYNC_SLOW_BIT 0x8 / / synchronization is slow
Copy the code
The DISPATCH_OBJ_BARRIER_BIT identifier blocks and waits for the serial queue to execute tasks asynchronously.
5.3 Task Dispatch: _dispatch_queue_push
_dispatch_queue_push is a macro definition that ends up executing the _dispatch_queue_PUSH_list function.
#define _dispatch_queue_push(x, y) _dispatch_queue_push_list((x), (y), (y))
#define _dispatch_queue_push_list _dispatch_trace_queue_push_list
Copy the code
_dispatch_trace_queue_push_list
void _dispatch_trace_queue_push_list(dispatch_queue_t dq, dispatch_object_t _head, dispatch_object_t _tail) {
// Whether you can join the team
if (slowpath(DISPATCH_QUEUE_PUSH_ENABLED())) {
struct dispatch_object_s *dou = _head._do;
do {
Continuation_s (dispatch_continuation_s) ¶
_dispatch_trace_continuation(dq, dou, DISPATCH_QUEUE_PUSH);
} while(dou ! = _tail._do && (dou = dou->do_next)); } _dispatch_queue_push_list(dq, _head, _tail); }Copy the code
_dispatch_queue_push_list
void _dispatch_queue_push_list(dispatch_queue_t dq, dispatch_object_t _head, dispatch_object_t _tail) {
struct dispatch_object_s *prev, *head = _head._do, *tail = _tail._do;
tail->do_next = NULL;
dispatch_atomic_store_barrier();
// dispatch_atomic_xchg2O essentially calls ((typeof(*(p)))__sync_swap((p), (n))) which sets p to n and returns the value before p.
// dispatch_atomic_xchg2O (dq, dq_ITEMs_tail, tail) = dq-> dq_ITEMs_tail = tail
prev = fastpath(dispatch_atomic_xchg2o(dq, dq_items_tail, tail));
if (prev) {
// if we crash here with a value less than 0x1000, then we are at a
// known bug in client code for example, see _dispatch_queue_dispose
// or _dispatch_atfork_child
// Prev is the original end of the queue. If there are other elements in the queue, the pushed object is added to the end of the queue.
prev->do_next = head;
} else {
// If queue is empty_dispatch_queue_push_list_slow(dq, head); }}Copy the code
_dispatch_queue_push_list_slow
_dispatch_queue_push_list_slow(dispatch_queue_t dq,
struct dispatch_object_s *obj)
{
//dq->dq_items_head set to DC and wake up the queue. Because the queue is empty and no task is executing, it is in sleep state, so it needs to wake up
_dispatch_retain(dq);
dq->dq_items_head = obj;
_dispatch_wakeup(dq);
_dispatch_release(dq);
}
Copy the code
Through the analysis of asynchronous task joining, we can know that joining only adds tasks to the queue one by one in FIFO order, that is, it needs a time point to execute these tasks.
5.4 Wake up queue: _dispatch_wakeup
Both synchronous and asynchronous call _dispatch_wakeup, which wakes up the current queue.
_dispatch_wakeup
dispatch_queue_t _dispatch_wakeup(dispatch_object_t dou) {
dispatch_queue_t tq;
if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
return NULL;
}
if(! dx_probe(dou._do) && ! dou._dq->dq_items_tail) {return NULL;
}
// If dou._do->do_suspend_cnt == 0, return YES, otherwise return NO;
// Also assign DISPATCH_OBJECT_SUSPEND_LOCK to dou._do->do_suspend_cnt
if(! dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt,0, DISPATCH_OBJECT_SUSPEND_LOCK)) {
#if DISPATCH_COCOA_COMPAT
_dispatch_queue_wakeup_main wakes up the main queue
if (dou._dq == &_dispatch_main_q) {
_dispatch_queue_wakeup_main();
}
#endif
return NULL;
}
// Put the dispatch_queue_push method on the target queue again
_dispatch_retain(dou._do);
tq = dou._do->do_targetq;
_dispatch_queue_push(tq, dou._do);
return tq;
}
Copy the code
In the above code, we only see operations on the main queue and other custom queues, but there is no operation on the global queue. The call to the dx_probe(dou._do) for the global queue is as follows:
#define dx_probe(x) (x)->do_vtable->do_probe(x)
/ / dx_probe (dou. _do) is equivalent to (dou. Do) - > do_vtable - > do_probe (dou. Do)
// Global queue do_vtable: _dispatch_queue_root_vtable
// _dispatch_queue_root_vtable definition
static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
.do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
.do_kind = "global-queue",
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_queue_wakeup_global,
};
// Global queue:
// globalQueue -> _dispatch_queue_root_vtable -> _dispatch_queue_wakeup_global
Copy the code
As you can see from the above code, _dispatch_wakeup is divided into four cases:
- Call in the main team
_dispatch_queue_wakeup_main
; - Global queue call
_dispatch_queue_wakeup_global
; - Other queues push the queue to the target queue and continue the queue operation.
- Manage queue calls
_dispatch_mgr_wakeup
This is mainly for dispatch_source.
_dispatch_queue_wakeup_main
The _dispatch_main_queue_wakeup function is used to wakeup the main thread Runloop, as mentioned earlier in Reworking the Runloop Principle:
With GCD asynchrony, we rely on RunLoop to return to the main thread after a child thread has done something.
Previously, the console printed validation, now we verify ourselves in the source code:
void _dispatch_queue_wakeup_main(void) {
kern_return_t kr;
// The implementation of _dispatch_main_q_port_init
dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
_dispatch_main_q_port_init);
// Wake up the main thread by mach_port and registering the corresponding source1 in the runloop
kr = _dispatch_send_wakeup_main_thread(main_q_port, 0);
switch (kr) {
case MACH_SEND_TIMEOUT:
case MACH_SEND_TIMED_OUT:
case MACH_SEND_INVALID_DEST:
break;
default:
(void)dispatch_assume_zero(kr);
break;
}
_dispatch_safe_fork = false;
}
// The implementation of _dispatch_MAIN_q_port_init requires a Mach port to wake up RunLoop
void _dispatch_main_q_port_init(void *ctxt DISPATCH_UNUSED) {
kern_return_t kr;
kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
&main_q_port);
DISPATCH_VERIFY_MIG(kr);
(void)dispatch_assume_zero(kr);
kr = mach_port_insert_right(mach_task_self(), main_q_port, main_q_port,
MACH_MSG_TYPE_MAKE_SEND);
DISPATCH_VERIFY_MIG(kr);
(void)dispatch_assume_zero(kr);
_dispatch_program_is_probably_callback_driven = true;
_dispatch_safe_fork = false;
}
Copy the code
_dispatch_queue_wakeup_global
static bool
_dispatch_queue_wakeup_global(dispatch_queue_t dq) {
static dispatch_once_t pred;
struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
int r;
if(! dq->dq_items_tail)return false;
_dispatch_safe_fork = false;
dispatch_debug_queue(dq, __PRETTY_FUNCTION__);
// Context and root queue initialization. The thread pool is initialized inside the root queue
dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
// _dispatch_queue_wakeup_global Can wakeup pthread_workqueue and thread pool
// 1. Support pthread_workqueue
#if HAVE_PTHREAD_WORKQUEUES
#if DISPATCH_ENABLE_THREAD_POOL
If dgq_kworkqueue exists, pthread_workqueue_additem_NP is called
// dgq_kworkQueue is an interface for creating kernel threads that perform work queued by other kernel modules.
Dispatch queues with different priorities correspond to workqueues with different priorities.
// _dispatch_root_queues_init creates a pthread_workqueue using pthread_workqueue_create_NP during initialization
if (qc->dgq_kworkqueue)
#endif
{
if (dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0.1)) {
pthread_workitem_handle_t wh;
unsigned int gen_cnt;
_dispatch_debug("requesting new worker thread");
// This function uses the workq_kernRETURN system call to notify workQueue to add items that should be executed.
// According to this notification, the XNU kernel determines whether to generate threads based on the system state. If it is an overCOMMIT priority queue, the workQueue will always generate threads.
r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, _dispatch_worker_thread2, dq, &wh, &gen_cnt);
(void)dispatch_assume_zero(r);
} else {
_dispatch_debug("work thread request still pending on global "
"queue: %p", dq);
}
goto out;
}
#endif // HAVE_PTHREAD_WORKQUEUES
// 2. Thread pool is supported
#if DISPATCH_ENABLE_THREAD_POOL
// Keep the thread alive by sending a semaphore
if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
goto out;
}
// Calculate the available length of the thread pool. If the thread pool is full, jump to out (return false), otherwise perform thread pool -1
pthread_t pthr;
int t_count;
do {
t_count = qc->dgq_thread_pool_size;
if(! t_count) { _dispatch_debug("The thread pool is full: %p", dq);
gotoout; }}while(! dispatch_atomic_cmpxchg2o(qc, dgq_thread_pool_size, t_count, t_count -1));
// qc->dgq_thread_pool_size = t_counts
// Create a thread using pthread.
// execute _dispatch_worker_thread and _dispatch_worker_thread will eventually call _dispatch_worker_thread2
while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
if(r ! = EAGAIN) { (void)dispatch_assume_zero(r);
}
sleep(1);
}
// Ensure that PTHR can be recycled
r = pthread_detach(pthr);
(void)dispatch_assume_zero(r);
#endif // DISPATCH_ENABLE_THREAD_POOL
out:
return false;
}
Copy the code
5.5 Scheduling Tasks in queues
Task scheduling for the main queue: _dispatch_main_queue_callback_4CF
Through the above source code analysis, we know that the main queue will call the _dispatch_send_wakeup_main_thread function during the wakeup process, but the implementation of this function is not open source related implementation of this function, it seems that we can not see the main queue task scheduling. By printing the function call stack we can see that the main queue is scheduled by the _dispatch_main_queue_callback_4CF function.
_dispatch_main_queue_callback_4CF is called after __CFRUNLOOP_IS_SERVICING_THE_MAIN_DISPATCH_QUEUE__.
// Process main queue tasks
void _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED) {
if (main_q_is_draining) {
return;
}
// Set the status to true while the task is being processed
_dispatch_queue_set_mainq_drain_state(true);
// Schedule processing tasks
_dispatch_main_queue_drain();
// Complete the task and restore to false
_dispatch_queue_set_mainq_drain_state(false);
}
Copy the code
The primary queue is a serial queue that executes sequentially, so there is no concurrent logic. The task scheduling of the main queue is the sequential traversal. The main queue arouses the DC to be executed this time and performs the task execution. For the subsequent tasks, they will be executed in the wake up of the main queue in the next round. This is also a rough implementation of the _dispatch_MAIN_queue_drain function.
Global queue task scheduling: _dispatch_worker_thread2
Global queues queue tasks via the _dispatch_queue_wakeup_global function. The _dispatch_worker_thread2 function is then called to process the tasks in the corresponding queue.
Two functions are important in the implementation of _dispatch_worker_thread2
_dispatch_queue_concurrent_drain_one
Functions;_dispatch_continuation_pop
Functions;
_dispatch_queue_concurrent_drain_one
The _dispatch_queue_concurrent_DRAIN_one function handles the following items:
- Boundary processing under multi-thread competition;
- Get out team DC
- Wake up the global queue again
static struct dispatch_object_s *
_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq) {
struct dispatch_object_s *head, *next, *const mediator = (void(*) ~0ul;
// The mediator value acts both as a "lock" and a signal
head = dispatch_atomic_xchg2o(dq, dq_items_head, mediator);
Check whether the queue is empty. If yes, NULL is returned
if (slowpath(head == NULL)) {(void)dispatch_atomic_cmpxchg2o(dq, dq_items_head, mediator, NULL);
_dispatch_debug("no work on global work queue");
return NULL;
}
if (slowpath(head == mediator)) {
// This thread lost ownership of the queue in the live thread race, which means libDispatch is very inefficient,
// This situation means that there are too many threads in the thread pool, at which point a pengding thread should be created,
// Then exit the thread, and the kernel creates a new thread when the load drops
_dispatch_debug("Contention on queue: %p", dq);
_dispatch_queue_wakeup_global(dq);
#if DISPATCH_PERF_MON
dispatch_atomic_inc(&_dispatch_bad_ratio);
#endif
return NULL;
}
// Save the do_next of the head pointer before returning. If next is NULL, this means that item is the last one
next = fastpath(head->do_next);
if(slowpath(! next)) { dq->dq_items_head =NULL;
if (dispatch_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL)) {
// The head and tail Pointers are null
goto out;
}
// There must be an item. The thread will not wait long.
while(! (next = head->do_next)) { _dispatch_hardware_pause(); } } dq->dq_items_head = next;// Wake up the global queue again
_dispatch_queue_wakeup_global(dq);
out:
// Return the DC to be processed
return head;
}
Copy the code
To explain my understanding of how to wakeup the global queue again by calling _dispatch_queue_wakeup_global: We know that DC execution in a concurrent queue is concurrent, so every time a DC is queued, we check the global queue to see if there are any more DCS in the queue. If there is a work queue, it is notified that it needs to create another work Queue to process the remaining DCS in the queue, and then repeat the above steps, similar to a recursive process. When multiple work queues process multiple DCS at the same time, we see an asynchronous effect.
_dispatch_continuation_pop
The _dispatch_continuation_POP function implements the processing of tasks. These tasks could be asynchronous tasks, group tasks, fence tasks, or even queues.
static inline void
_dispatch_continuation_pop(dispatch_object_t dou) {
dispatch_continuation_t dc = dou._dc;
dispatch_group_t dg;
_dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou);
// Check if it is a queue, if so, enter the _dispatch_queue_invoke processing queue
// Dispatch_barrier_async tasks go to the following branches to keep barrier tasks separate from other tasks,
// Dispath_semaphore_t to notify the barrier task to execute
if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
return _dispatch_queue_invoke(dou._dq);
}
// Add the item back to the cache before calling the function. This
// allows the 'hot' continuation to be used for a quick callback.
//
// The ccache version is per-thread.
// Therefore, the object has not been reused yet.
// This generates better assembly.
// Whether it is an asynchronous task
if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
_dispatch_continuation_free(dc);
}
// Check if it is a group task
if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
dg = dc->dc_group;
} else {
dg = NULL;
}
// Is a dispatch_continuation_t structure (DC) for task encapsulation, directly executing tasks.
// This is also when asynchronous blocks are called
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
if (dg) {
// For group, execute dispatch_group_leavedispatch_group_leave(dg); _dispatch_release(dg); }}Copy the code
_dispatch_queue_invoke
void
_dispatch_queue_invoke(dispatch_queue_t dq) {
if(! slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) && fastpath(dispatch_atomic_cmpxchg2o(dq, dq_running,0.1))) {
dispatch_atomic_acquire_barrier();
dispatch_queue_t otq = dq->do_targetq, tq = NULL;
_dispatch_queue_drain(dq);
if (dq->do_vtable->do_invoke) {
// Assume that object invoke checks it is executing on correct queue
tq = dx_invoke(dq);
} else if(slowpath(otq ! = dq->do_targetq)) {// An item on the queue changed the target queue
tq = dq->do_targetq;
}
// We do not need to check the result.
// When the suspend-count lock is dropped, then the check will happen.
dispatch_atomic_release_barrier();
//dq_running minus 1, because the task is either executed directly or pushed to the target queue
(void)dispatch_atomic_dec2o(dq, dq_running);
if (tq) {
return _dispatch_queue_push(tq, dq);
}
}
dq->do_next = DISPATCH_OBJECT_LISTLESS;
if(! dispatch_atomic_sub2o(dq, do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK)) {// The queue is idle and needs to wake up
if (dq->dq_running == 0) {
_dispatch_wakeup(dq); // verify that the queue is idle}}// Release the queue
_dispatch_release(dq); // added when the queue is put on the list
}
Copy the code
Now let’s rearrange the queue asynchronously, as shown below:
conclusion
-
Queues and threads can have a many-to-one relationship, where tasks in different queues can be executed on one thread, as well as on the main thread.
-
The relationship between queue operations and open threads:
- Serial queues execute tasks synchronously, in order in the current thread, and may block the current thread if previous tasks are not completed.
- Serial queues execute tasks asynchronously, starting a thread, executing tasks in order in the new thread without blocking the current thread, and executing tasks in order in the new thread.
- Concurrent queues execute tasks synchronously, no new threads are opened, tasks are executed in order in the current thread, and the current thread may be blocked if the previous task is not completed.
- The number of threads is determined by the system. The tasks are executed in the newly created thread without blocking the current thread. All tasks are executed in disorder.
- Main queue executes tasks synchronously, deadlocked.
- The main queue executes tasks asynchronously, in order in the main thread, and may block the current thread if previous tasks are not completed.
-
The synchronization/asynchrony of the queue determines whether it has the ability to start threads, and the serial/concurrency of the queue determines the number of tasks to be processed.
-
Dispatch_queue is implemented as FIFO (first-in, first-out) queue by struct and linked list. Both serial queue and concurrent queue comply with FIFO principle. The main differences between the two are: the execution order is different, and the number of threads started is different.
-
The dispatch_sync function is typically executed on the current thread, using the semaphore bound to the thread for serialization.
-
Dispatch_async dispatches to the main queue are handled by Runloop while dispatch_async dispatches to other queues are handled by thread pools.
-
Blocks are not added directly to the queue, but first form a dispatch_CONTINUATION structure. The structure contains the Block and some context information. The queue adds these Dispatch_CONTINUATION structures to the queue’s linked list. No matter what type of queue they are, they are ultimately related to global queues. When the global queue executes a Block,libdispatch takes the dispatch_continuation from the global queue, calls pthread_workqueue_additem_NP, Pass the global queue itself, the workqueue information that matches its priority, and the callback function of the Dispatch_CONTINUATION structure to the parameters. The pthread_workqueue_additem_NP function uses the workq_kernreturn system call to tell the workqueue to add items that should be executed. According to this comrade, the XNU kernel determines whether to generate threads based on the system state. If the global workQueue is overCOMMIT priority, threads will always be generated. The thread of the workqueue executes the pthread_workqueue function, which calls the libDispatch callback. This function executes the Block added to the Dispatch_CONTINUATION
-
A GCD deadlock is caused by a queue, not a thread, because the _dispatch_barrier_sync_F_slow function uses the thread-corresponding semaphore and calls the wait method, resulting in a thread deadlock.
-
On the fence function
dispatch_barrier_async
The applicable scene queue must be usedDISPATCH_QUEUE_CONCURRENT
Property, which behaves like and when global concurrent queues are useddispatch_async
The same. The reason:dispatch_barrier_async
If the incoming queue is global, it will be executed when the queue is woken up_dispatch_queue_wakeup_global
Function, which executes the samedispatch_async
And if it’s a custom queue,_dispatch_continuation_pop
Will performdispatch_queue_invoke
. In the while loop, the tasks are retrieved and called_dispatch_continuation_redirect
Function that causes blocks to execute concurrently. When faced withDISPATCH_OBJ_BARRIER_BIT
Is modified when markeddo_suspend_cnt
Flag to ensure that subsequent while loops go directly to out. Once the Barrier Block is complete_dispatch_queue_class_invoke
willdo_suspend_cnt
Reset, so that tasks after the Barrier block continue.