Last time we looked at queue creation, this time we will start to look at related functions in GCD, starting with dispatch_async and dispatch_sync, which we use most.
GCD function reading process will involve a number of macro definitions composed of the structure of the definition, need to step by step macro expansion to better understand the code.
dispatch_async
When we submit a task to a queue, both block and function forms are eventually encapsulated as dispatch_continuation_s, so we can think of it as a structure describing the task content, The dispatch_continuation_s structure is first created inside the dispatch_async function.
The first thing we need to know is that both dispatch_async sending blocks asynchronously to the queue and dispatch_async_f sending functions asynchronously to the queue wrap the submitted tasks as dispatch_continuation_s, The dispatch_continuation_s structure uses a function pointer (dc_func) to store tasks to be performed, When block tasks are submitted, dispatch_continuation_s stores functions defined by the block structure, not the block itself.
The dispatch_async function can be logically divided into two parts. The first part wraps the work function (_dispatch_continuation_init) and the second part calls the thread (_dispatch_continuation_async).
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
// Get dispatch_continuation_s (from cache or new)
dispatch_continuation_t dc = _dispatch_continuation_alloc(); / / 1 ⃣ ️
// DC_FLAG_CONSUME is the source of dispatch_continuation_s.
// #define DC_FLAG_CONSUME 0x004ul
uintptr_t dc_flags = DC_FLAG_CONSUME; / / 2 ⃣ ️
// dispatch_qOS_t is a Uint32_t type alias
dispatch_qos_t qos;
// A series of initialization and configuration of dispatch_continuations_s structure variables
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags); / / 3 ⃣ ️
// When dispatch_continuations_s is ready, _dispatch_continuation_async is nested
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
Copy the code
Below we first analyze the dispatch_async function involved in the initialization and configuration of dispatch_continuation_S structure variables, and then go into the _dispatch_continuation_async function learning, Explore how the most commonly used asynchronous invocation functions are implemented.
The macro definition of dispatch_continuation_s structure is expanded as follows:
typedef struct dispatch_continuation_s {
union {
const void *do_vtable;
uintptr_t dc_flags;
};
union {
pthread_priority_t dc_priority;
int dc_cache_cnt;
uintptr_t dc_pad;
};
struct dispatch_continuation_s *volatile do_next; // Next task
struct voucher_s *dc_voucher;
// typedef void (*dispatch_function_t)(void *_Nullable);
dispatch_function_t dc_func; // A pointer to the function to be executed
void *dc_ctxt; // Method context (parameter)
void *dc_data; // Related data
void *dc_other; // Other information
} *dispatch_continuation_t;
Copy the code
_dispatch_continuation_alloc
_dispatch_continuation_alloc is called internally to retrieve dispatch_continuation_alloc_cacheonly from the cache. If not, call _dispatch_continuation_alloc_from_heap to create a dispatch_continuation_s on the heap.
DISPATCH_ALWAYS_INLINE
static inline dispatch_continuation_t
_dispatch_continuation_alloc(void)
{
dispatch_continuation_t dc =
_dispatch_continuation_alloc_cacheonly();
// Create dispatch_continuation_s in the heap if it does not exist in the cache
if (unlikely(! dc)) {return _dispatch_continuation_alloc_from_heap();
}
return dc;
}
Copy the code
_dispatch_continuation_alloc_cacheonly
_dispatch_continuation_alloc_cacheonly internal calls to _dispatch_thread_getSpecific are retrieved from the current thread according to dispatch_cache_key as the key Dispatch_continuation_t assigns a value to the DC, and then saves the DC’s do_next as the new value in the current thread’s storage by calling the _dispatch_thread_setspecific function. (i.e., update dispatch_continuation_t available in the current cache)
DISPATCH_ALWAYS_INLINE
static inline dispatch_continuation_t
_dispatch_continuation_alloc_cacheonly(void)
{
dispatch_continuation_t dc = (dispatch_continuation_t)
_dispatch_thread_getspecific(dispatch_cache_key);
// Update the value of dispatch_cache_key as the key stored in thread storage
if (likely(dc)) {
_dispatch_thread_setspecific(dispatch_cache_key, dc->do_next);
}
return dc;
}
#define _dispatch_thread_getspecific(key) \
(_dispatch_get_tsd_base()->key)
#define _dispatch_thread_setspecific(key, value) \
(void)(_dispatch_get_tsd_base()->key = (value))
Copy the code
DC_FLAG
DC_FLAG_CONSUME: Continuation resources is released at runtime, indicating that dispatch_continuation_s is set at asynchronous or non-event_handler source handlers.
#define DC_FLAG_CONSUME 0x004ul
Copy the code
·DC_FLAG_BLOCK: Continuation function is a block.
#define DC_FLAG_BLOCK 0x010ul
Copy the code
DC_FLAG_ALLOCATED: bit used to ensure that dc_flags of allocated Continuations will never be 0.
#define DC_FLAG_ALLOCATED 0x100ul
Copy the code
Dispatch_qos_t is a Uint32_t type alias.
typedef uint32_t dispatch_qos_t;
Copy the code
_dispatch_continuation_init
The _dispatch_continuation_init function initializes the DC based on input parameters.
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, dispatch_block_t work,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
// copy the block to the heap.
void *ctxt = _dispatch_Block_copy(work);
// input parameter dc_flags is DC_FLAG_CONSUME (0x004ul)
// #define DC_FLAG_BLOCK 0x010ul
// #define DC_FLAG_ALLOCATED 0x100ul
// dc_flags = 0x114ul, merge the above three enumerations together
dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
// Check whether the work block function is equal to an enclosing function pointer
if (unlikely(_dispatch_block_has_private_data(work))) {
dc->dc_flags = dc_flags;
dc->dc_ctxt = ctxt;
// will initialize all fields but requires dc_flags & dc_ctxt to be set
// Perform slow initialization of dispatch_continuation_s
return _dispatch_continuation_init_slow(dc, dqu, flags);
}
// Block (dispatch_function_t)
dispatch_function_t func = _dispatch_Block_invoke(work);
// the default dc_flags parameter of dispatch_async is DC_FLAG_CONSUME
if (dc_flags & DC_FLAG_CONSUME) {
// _dispatch_call_block_and_RELEASE calls and releases blocks
// _dispatch_call_block_and_release is work
func = _dispatch_call_block_and_release;
}
// The dispatch_continuation_s structure variables are set to a range of member variables, including voucher and priority.
// CTXT is a pointer to a copy of work in the heap
// func is a function
return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
Copy the code
_dispatch_Block_copy
_dispatch_Block_copy Internally calls Block_copy to copy the stack block to the heap, or add one to the heap block reference.
void *
(_dispatch_Block_copy)(void *db)
{
dispatch_block_t rval;
if (likely(db)) {
while (unlikely(! (rval =Block_copy(db)))) {
// Ensure that the block is successfully replicated
_dispatch_temporary_resource_shortage();
}
return rval;
}
DISPATCH_CLIENT_CRASH(0."NULL was passed where a block should have been");
}
DISPATCH_NOINLINE
void
_dispatch_temporary_resource_shortage(void)
{
sleep(1);
__asm__ __volatile__(""); // prevent tailcall
}
Copy the code
_dispatch_Block_invoke
If you are familiar with the construction inside a block, invoke is a function pointer to the function that the block is executing. (When we define a block, all expression contents expanded with {} form a complete function that the block executes, and the invoke pointer in the block structure points to that function.)
_dispatch_Block_invoke is a macro definition that retrieves the invoke member variable in the block structure.
typedef void(*BlockInvokeFunction)(void*,...).;
struct Block_layout {.// Function pointer to the function to be executed by the block (that is, the expression in curly braces in the block definition)BlockInvokeFunction invoke; . };#define _dispatch_Block_invoke(bb) \
((dispatch_function_t)((struct Block_layout *)bb)->invoke)
Copy the code
_dispatch_block_has_private_data
Check whether the block’s invoke pointer points to _dispatch_block_special_invoke (an external function pointer).
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_block_has_private_data(const dispatch_block_t block)
{
return (_dispatch_Block_invoke(block) == _dispatch_block_special_invoke);
}
Copy the code
_dispatch_block_special_invoke
An external function pointer.
extern "C" {
// The compiler hides the name of the function it generates, and changes it if we try to reference it directly, but the linker still sees it.
extern void DISPATCH_BLOCK_SPECIAL_INVOKE(void *)
__asm__(OS_STRINGIFY(__USER_LABEL_PREFIX__) "___dispatch_block_create_block_invoke");
void (*const _dispatch_block_special_invoke)(void*) = DISPATCH_BLOCK_SPECIAL_INVOKE;
}
Copy the code
_dispatch_call_block_and_release
Execute a block and release the block.
void
_dispatch_call_block_and_release(void *block)
{
void (^b)(void) = block;
b(a);/ / block is carried out
Block_release(b); / / block release
}
Copy the code
_dispatch_continuation_init_f
Configure the member variables of the dispatch_continuation_s structure variable to assign the parameters passed to the DC, The _dispatch_continuation_voucher_set and _dispatch_continuation_priority_set functions are executed.
DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
dispatch_block_flags_t flags, uintptr_t dc_flags)
{
pthread_priority_t pp = 0;
dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED; // dc_flags add a new flag #define DC_FLAG_ALLOCATED 0x100ul
dc->dc_func = f; // The function to be executed
dc->dc_ctxt = ctxt;
// in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority should not be propagated, only taken from the handler if it has one
// DISPATCH_BLOCK_HAS_PRIORITY in this context means that priority should not be propagated, only if the handler has one
if(! (flags & DISPATCH_BLOCK_HAS_PRIORITY)) { pp = _dispatch_priority_propagate(); }/ / configuration voucher
_dispatch_continuation_voucher_set(dc, flags);
/ / configuration priority
return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}
Copy the code
At this point the wrapping of the task (dispatch_continuation_s) is complete, and let’s look at the contents of the _dispatch_continuation_async function.
_dispatch_continuation_async
When dispatch_continuation_async is ready, call _dispatch_continuation_async(dq, dc, qos, dc->dc_flags).
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if(! (dc_flags & DC_FLAG_NO_INTROSPECTION)) { _dispatch_trace_item_push(dqu, dc); }#else
(void)dc_flags;
#endif
// Call the queue's dq_push function
return dx_push(dqu._dq, dc, qos);
}
// dx_push is a macro definition
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
void (*const dq_push)(dispatch_queue_class_t.dispatch_object_t.dispatch_qos_t)
Copy the code
The _dispatch_continuation_async function uses a macro definition internally: Dx_push, the macro defines the dq_push of the vtable that calls dQU (dispatch_queue_class_t). .
Global search dQ_push found multiple different queues for assignment, For example, the root queue (.dq_push = _dispatch_root_queue_push), main queue (.dq_push = _dispatch_main_queue_push), and concurrent queue (.dq_push = _dispatch_LANe_concurrent_push), serial queues (.dq_push = _dispatch_lane_push), etc. Since our custom queues all target the root queue (where most tasks are executed), So we use _dispatch_root_queue_push as an example.
_dispatch_root_queue_push
The _dispatch_root_queue_push function is finally called inside the _dispatch_root_queue_PUSH_inline (Rq, dou, dou, 1) function.
DISPATCH_NOINLINE
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
if (unlikely(ddi && ddi->ddi_can_stash)) {
dispatch_object_t old_dou = ddi->ddi_stashed_dou;
dispatch_priority_t rq_overcommit;
rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
if (likely(! old_dou._do || rq_overcommit)) {dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
ddi->ddi_stashed_rq = rq;
ddi->ddi_stashed_dou = dou;
ddi->ddi_stashed_qos = qos;
_dispatch_debug("deferring item %p, rq %p, qos %d",
dou._do, rq, qos);
if (rq_overcommit) {
ddi->ddi_can_stash = false;
}
if (likely(! old_dou._do)) {return;
}
// push the previously stashed itemqos = old_qos; rq = old_rq; dou = old_dou; }}#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
if (_dispatch_root_queue_push_needs_override(rq, qos)) {
return _dispatch_root_queue_push_override(rq, dou, qos);
}
#else
(void)qos;
#endif
// Call the inline function
_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
Copy the code
_dispatch_root_queue_push_inline
_dispatch_root_queue_PUSH_inline calls the _dispatch_root_queue_POKE function.
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
// _dispatch_root_queue_poke
return _dispatch_root_queue_poke(dq, n, 0); }}Copy the code
_dispatch_root_queue_poke
The _dispatch_root_queue_POKE function mainly does some fault tolerance and then calls the _dispatch_root_queue_POke_slow function.
DISPATCH_NOINLINE
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
if(! _dispatch_queue_class_probe(dq)) {return;
}
#if! DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
#endif
{
if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
_dispatch_root_queue_debug("worker thread request still pending "
"for global queue: %p", dq);
return; }}#endif / /! DISPATCH_USE_INTERNAL_WORKQUEUE
// Above is some fault tolerance, mainly the _dispatch_root_queue_poke_slow function called here
return _dispatch_root_queue_poke_slow(dq, n, floor);
}
Copy the code
_dispatch_root_queue_poke_slow
DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
// n = 1
int remaining = n;
int r = ENOSYS;
// create a thread pool associated with the root queue (dispatch_once_f is called internally, globally only once)
_dispatch_root_queues_init();
// DEGBUG mode when the print __func__ function is executed
_dispatch_debug_root_queue(dq, __func__);
// hook
_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
#if! DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
#endif / /! DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
if (likely(pqc->dpq_thread_mediator.do_vtable)) {
while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
_dispatch_root_queue_debug("signaled sleeping worker for "
"global queue: %p", dq);
if(! --remaining) {return; }}}bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
if (overcommit) {
os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
} else {
if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
_dispatch_root_queue_debug("worker thread request still pending for "
"global queue: %p", dq);
return; }}int can_request, t_count;
// seq_cst with atomic store to tail <rdar://problem/16932833>
// Get the thread pool size
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
// Calculate the number of requests that can be made
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
// An error will be reported if the thread pool is unavailable
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return; }}while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
#if! defined(_WIN32)
pthread_attr_t *attr = &pqc->dpq_thread_attr;
pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (unlikely(dq == &_dispatch_mgr_root_queue)) {
pthr = _dispatch_mgr_root_queue_init();
}
#endif
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
// create a thread pthread_create
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if(r ! = EAGAIN) { (void)dispatch_assume_zero(r); } _dispatch_temporary_resource_shortage(); }}while (--remaining);
#else // defined(_WIN32)
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (unlikely(dq == &_dispatch_mgr_root_queue)) {
_dispatch_mgr_root_queue_init();
}
#endif
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
unsigned dwStackSize = 0;
#else
unsigned dwStackSize = 64 * 1024;
#endif
uintptr_t hThread = 0;
while(! (hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
if(errno ! = EAGAIN) { (void)dispatch_assume(hThread);
}
_dispatch_temporary_resource_shortage();
}
if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
(void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
}
CloseHandle((HANDLE)hThread);
} while (--remaining);
#endif // defined(_WIN32)
#else
(void)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
}
Copy the code
According to the code, the system will get the total number of thread pools and the number of threads that can be created, and then use two do while to dynamically open up threads.
The complete function call stack is roughly condensed as follows: Dispatch_async ➡️ _dispatch_continuation_init ➡️ _dispatch_continuation_async ➡️ dx_push ➡️ Dq_push (_dispatch_root_queue_push) ➡️ _dispatch_root_queue_PUSH_inline ➡️ _dispatch_root_queue_poke ➡️ _dispatch_root_queue_poke_slow ➡ ️ pthread_create.
dispatch_sync
Here’s how to execute the dispatch_sync function.
DISPATCH_NOINLINE
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
// #define DC_FLAG_BLOCK 0x010ul
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
// work block
// _dispatch_Block_invoke(work) function
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
Copy the code
The dispatch_sync function internally invokes the _dispatch_sync_f function.
_dispatch_sync_f
Only one line of _dispatch_sync_f directly calls the _dispatch_sync_F_inline function.
DISPATCH_NOINLINE
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
Copy the code
_dispatch_sync_f_inline
In the _dispatch_sync_f_inline function, the opening is a dq->dq_width == 1. The dQ_width value of the serial queue is 1, and the dq_width value of the custom concurrent queue is 0xFFeull. The dq_width value of the root queue is 0xFFFULL, that is, _dispatch_barrier_SYNc_f (Dq, CTXT, func, dc_FLAGS) will be executed if the DQ parameter is a serial queue, and if the DQ parameter is a concurrent queue, The following function is executed.
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
// Serial queue dq_width = 1
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
// _DISPATCH_LANE_TYPE = 0x00000011, // meta-type for lanes
// _DISPATCH_META_TYPE_MASK = 0x000000ff, // mask for object meta-types
// #define dx_metatype(x) (dx_vtable(x)->do_type & _DISPATCH_META_TYPE_MASK)
if (unlikely(dx_metatype(dq) ! = _DISPATCH_LANE_TYPE)) {DISPATCH_CLIENT_CRASH(0."Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
// Global concurrent queues and queues bound to non-dispatch threads are always slow cases
if (unlikely(! _dispatch_queue_try_reserve_sync_width(dl))) {return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
Copy the code
upcast
The input is converted to the dispatch_object_t type.
DISPATCH_ALWAYS_INLINE
static inline dispatch_object_t
upcast(dispatch_object_t dou)
{
return dou;
}
Copy the code
_dispatch_barrier_sync_f
Within the _dispatch_barrier_SYNc_f function, only _dispatch_barrier_sync_f_inline is called.
DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
Copy the code
_dispatch_barrier_sync_f_inline
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
// Get the ID of the current thread (TLS is stored in thread storage)
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) ! = _DISPATCH_LANE_TYPE)) {DISPATCH_CLIENT_CRASH(0."Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// The more correct thing to do would be to merge the qos of the
// thread that just acquired the barrier lock into the queue state.
It is more correct to merge the mass of the thread that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fast path, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, this thread may receive a useless override.
//
// However, this is too expensive for fast paths, so skip this step.
// The tradeoff chosen is that if a queue on a lower priority thread competes with this fast path, that thread may receive unwanted overrides.
//
// Global concurrent queues and queues bound to non-dispatch threads always
// fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
// Global concurrent queues and queues bound to unscheduled threads are always slow,
/ / a deadlock
if (unlikely(! _dispatch_queue_try_acquire_barrier_sync(dl, tid))) {return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
/ / execution block
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
Copy the code
In everyday development we know that if we have a serial queue and then we call dispatch_sync to add a task to the serial queue, a deadlock must occur. The cause of the deadlock is stored in the _dispatch_queue_try_acquire_barrier_sync(DL, tid) function call.
When we write the following function in viewDidLoad:
dispatch_sync(mainQueue, ^{
NSLog(@"✈ ️ ✈ ️ ✈ ️ crash." ");
});
Copy the code
0 __DISPATCH_WAIT_FOR_QUEUE__ ⬅️ 1 _dispatch_sync_f_slow ⬅️ 2 -[ViewController viewDidLoad]… The __DISPATCH_WAIT_FOR_QUEUE__ function crashed.
Let’s follow the _dispatch_queue_try_acquire_barrier_sync function to the call.
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid)
{
return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0);
}
Copy the code
The _dispatch_queue_try_acquire_barrier_sync_and_suspend function is called directly.
_dispatch_queue_try_acquire_barrier_sync_and_suspend
The _dispatch_queue_trY_acquire_barrier_SYNC_AND_SUSPEND function calls back to the OS_ATOMic_RMW_loop2O function to obtain status information from the underlying OS and return.
/* Used by _dispatch_barrier_{try,} sync * * Note, this fails if any of e:1 or dl! =0, but that allows this code to be * a simple cmpxchg which is significantly faster on Intel, and makes a * significant difference on the uncontended codepath. * * See discussion for DISPATCH_QUEUE_DIRTY in queue_internal.h * * Initial state must be `completely idle` * Final state forces { ib:1, qf:1, w:0 } */
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
uint32_t tid, uint64_t suspend_count)
{
// #define DISPATCH_QUEUE_STATE_INIT_VALUE(width) \ // ((DISPATCH_QUEUE_WIDTH_FULL - (width)) << DISPATCH_QUEUE_WIDTH_SHIFT)
If dq->dq_width is 1, init = 0xffFull << 41
uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
// #define DISPATCH_QUEUE_WIDTH_FULL_BIT 0x0020000000000000ull
#define DISPATCH_QUEUE_IN_BARRIER 0x0040000000000000ULL #define DISPATCH_QUEUE_IN_BARRIER 0x0040000000000000ULL
// #define DISPATCH_QUEUE_SUSPEND_INTERVAL 0x0400000000000000ull
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dispatch_lock_value_from_tid(tid) |
(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
uint64_t old_state, new_state;
// #define DISPATCH_QUEUE_ROLE_MASK 0x0000003000000000ull
// #define os_atomic_rmw_loop2o(p, f, ov, nv, m, ...) os_atomic_rmw_loop(&(p)->f, ov, nv, m, __VA_ARGS__)
// Get information from the bottom layer -- state information -- current queue -- thread
return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
if(old_state ! = (init | role)) {os_atomic_rmw_loop_give_up(break);
}
new_state = value | role;
});
}
Copy the code
The _dispatch_LOCK_VALUE_FROM_TID bit operation obtains dispatch_lock.
DISPATCH_ALWAYS_INLINE
static inline dispatch_lock
_dispatch_lock_value_from_tid(dispatch_tid tid)
{
return tid & DLOCK_OWNER_MASK;
}
Copy the code
In the _dispatch_barrier_sync_f_inline function, let’s look at the contents of the _dispatch_sync_F_slow function if _dispatch_sync_F_slow is executed.
_dispatch_sync_f_slow
__DISPATCH_WAIT_FOR_QUEUE__(& DSC, DQ) is seen inside the _dispatch_sync_F_slow function.
The _dispatch_SYNC_F_slow function generates some task information, which is then pushed by _dispatch_trace_ITEM_push and stored in our synchronization queue (FIFO) to execute the function.
DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(! dq->do_targetq)) {return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
// Task information
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
/ / pressure stack
_dispatch_trace_item_push(top_dq, &dsc);
/ / ⬇ ️ ⬇ ️ ⬇ ️
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
Copy the code
__DISPATCH_WAIT_FOR_QUEUE__
The __DISPATCH_WAIT_FOR_QUEUE__ function gets the queue state to see if it is waiting and calls the XOR operation in _dq_state_DRAIN_locked_BY to determine the queue and thread waiting state. If both are waiting, YES is returned, causing a deadlock crash.
DISPATCH_NOINLINE
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
// Get the status of the queue to see if it is in wait state
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread"); }... }Copy the code
_dq_state_drain_locked_by
DISPATCH_ALWAYS_INLINE
static inline bool
_dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
{
return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
}
Copy the code
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// #define DISPATCH_QUEUE_DRAIN_OWNER_MASK ((uint64_t)DLOCK_OWNER_MASK)
// equivalent to _dispatch_lock_owner(lock_value) == tid
// lock_value indicates the queue status and tid indicates the thread ID
// ^ (xor) if two are the same, 0 will appear otherwise 1
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
Copy the code
_dispatch_sync first gets the TID of the current thread, and then the status returned by the underlying system. Then, it compares the waiting status of the queue with the TID. If the status is the same, it indicates that the queue is being deadlocked and crashes.
Seeing that we have found the cause of the serial queue deadlock, and how the dispatch_sync function execution is normally performed, let’s go back to the previous flow to continue.
In the _dispatch_barrier_sync_f_inline function, the _dispatch_lane_barrier_sync_INVOke_AND_complete function is normally called.
_dispatch_lane_barrier_sync_invoke_and_complete
/* * For queues we can cheat and inline the unlock code, * which is invalid for objects with a more complex state machine (sources or mach channels) */
DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void*dc)) { _dispatch_sync_function_invoke_inline(dq, ctxt, func); . }Copy the code
_dispatch_sync_function_invoke_inline
The _dispatch_sync_function_INVOke_inline function inside the _dispatch_client_callout function performs the task.
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
// func(ctxt)
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
Copy the code
_dispatch_client_callout
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
return f(ctxt); / / ⬅ ️
}
DISPATCH_NOINLINE
void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
_dispatch_get_tsd_base();
void *u = _dispatch_get_unwind_tsd();
if (likely(! u))return f(ctxt);
_dispatch_set_unwind_tsd(NULL);
f(ctxt); / / ⬅ ️
_dispatch_free_unwind_tsd();
_dispatch_set_unwind_tsd(u);
}
void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
@try {
return f(ctxt); / / ⬅ ️
}
@catch(...). {objc_terminate();
}
}
Copy the code
See here that the block of dispatch_sync is executed.
When dispatch_sync submits tasks to a serial queue, the complete function call stack is condensed as follows: Dispatch_sync ➡️ _dispatch_sync_f ➡️ _dispatch_sync_F_inline ➡️ _dispatch_barrier_sync_f ➡️ _dispatch_barrier_sync_f_inline ➡ ️ _dispatch_lane_barrier_sync_invoke_and_complete ➡ ️ _dispatch_sync_function_invoke_inline ➡️ _dispatch_client_callout ➡️ f(CTXT).
When dispatch_sync submits tasks to concurrent queues, the complete function call stack is condensed as follows: Dispatch_sync ➡️ _dispatch_sync_f ➡️ _dispatch_sync_F_inline ➡️ _dispatch_sync_INVOke_AND_complete ➡️ _dispatch_sync_function_invoke_inline ➡️ _dispatch_client_callout ➡️ f(CTXT).
Refer to the link
Reference link :🔗
- Libdispatch Apple source code
- GCD source code analysis 1 — the opening chapter
- Dig up the libDispatch source code
- GCD source code analysis
- A few things about GCD development
- GCD in-depth understanding: Part ONE
- Dispatch_once,
- Transparent federation type
- Perversion of libDispatch -dispatch_object_s
- The basics of GCD
- Swift multithreading from the source analysis – DispatchGroup
- GCD source code analysis (a)
- CD- source code analysis
- GCD underlying source code analysis
- (1) — GCD Queue
- C/C ++: computes variable parameter macrosVA_ARGSNumber of arguments of
- OC Low-level exploration (21)GCD asynchrony, GCD synchronization, singleton, semaphore, scheduling group, fence function and other low-level analysis