Last time we looked at queue creation, this time we will start to look at related functions in GCD, starting with dispatch_async and dispatch_sync, which we use most.

GCD function reading process will involve a number of macro definitions composed of the structure of the definition, need to step by step macro expansion to better understand the code.

dispatch_async

When we submit a task to a queue, both block and function forms are eventually encapsulated as dispatch_continuation_s, so we can think of it as a structure describing the task content, The dispatch_continuation_s structure is first created inside the dispatch_async function.

The first thing we need to know is that both dispatch_async sending blocks asynchronously to the queue and dispatch_async_f sending functions asynchronously to the queue wrap the submitted tasks as dispatch_continuation_s, The dispatch_continuation_s structure uses a function pointer (dc_func) to store tasks to be performed, When block tasks are submitted, dispatch_continuation_s stores functions defined by the block structure, not the block itself.

The dispatch_async function can be logically divided into two parts. The first part wraps the work function (_dispatch_continuation_init) and the second part calls the thread (_dispatch_continuation_async).

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    // Get dispatch_continuation_s (from cache or new)
    dispatch_continuation_t dc = _dispatch_continuation_alloc(); / / 1 ⃣ ️
    
    // DC_FLAG_CONSUME is the source of dispatch_continuation_s.
    // #define DC_FLAG_CONSUME 0x004ul
    uintptr_t dc_flags = DC_FLAG_CONSUME; / / 2 ⃣ ️
    
    // dispatch_qOS_t is a Uint32_t type alias
    dispatch_qos_t qos;
    
    // A series of initialization and configuration of dispatch_continuations_s structure variables
    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags); / / 3 ⃣ ️
    
    // When dispatch_continuations_s is ready, _dispatch_continuation_async is nested
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
Copy the code

Below we first analyze the dispatch_async function involved in the initialization and configuration of dispatch_continuation_S structure variables, and then go into the _dispatch_continuation_async function learning, Explore how the most commonly used asynchronous invocation functions are implemented.

The macro definition of dispatch_continuation_s structure is expanded as follows:

typedef struct dispatch_continuation_s {
    union {
        const void *do_vtable;
        uintptr_t dc_flags;
    };
    
    union {
        pthread_priority_t dc_priority;
        int dc_cache_cnt;
        uintptr_t dc_pad;
    };
    
    struct dispatch_continuation_s *volatile do_next; // Next task
    struct voucher_s *dc_voucher;
    
    // typedef void (*dispatch_function_t)(void *_Nullable);
    
    dispatch_function_t dc_func; // A pointer to the function to be executed
    void *dc_ctxt; // Method context (parameter)
    void *dc_data; // Related data
    void *dc_other; // Other information
} *dispatch_continuation_t;
Copy the code

_dispatch_continuation_alloc

_dispatch_continuation_alloc is called internally to retrieve dispatch_continuation_alloc_cacheonly from the cache. If not, call _dispatch_continuation_alloc_from_heap to create a dispatch_continuation_s on the heap.

DISPATCH_ALWAYS_INLINE
static inline dispatch_continuation_t
_dispatch_continuation_alloc(void)
{
    dispatch_continuation_t dc =
            _dispatch_continuation_alloc_cacheonly();
    // Create dispatch_continuation_s in the heap if it does not exist in the cache
    if (unlikely(! dc)) {return _dispatch_continuation_alloc_from_heap();
    }
    return dc;
}
Copy the code

_dispatch_continuation_alloc_cacheonly

_dispatch_continuation_alloc_cacheonly internal calls to _dispatch_thread_getSpecific are retrieved from the current thread according to dispatch_cache_key as the key Dispatch_continuation_t assigns a value to the DC, and then saves the DC’s do_next as the new value in the current thread’s storage by calling the _dispatch_thread_setspecific function. (i.e., update dispatch_continuation_t available in the current cache)

DISPATCH_ALWAYS_INLINE
static inline dispatch_continuation_t
_dispatch_continuation_alloc_cacheonly(void)
{
    dispatch_continuation_t dc = (dispatch_continuation_t)
            _dispatch_thread_getspecific(dispatch_cache_key);

    // Update the value of dispatch_cache_key as the key stored in thread storage
    if (likely(dc)) {
        _dispatch_thread_setspecific(dispatch_cache_key, dc->do_next);
    }
    return dc;
}

#define _dispatch_thread_getspecific(key) \
    (_dispatch_get_tsd_base()->key)
#define _dispatch_thread_setspecific(key, value) \
    (void)(_dispatch_get_tsd_base()->key = (value))
Copy the code

DC_FLAG

DC_FLAG_CONSUME: Continuation resources is released at runtime, indicating that dispatch_continuation_s is set at asynchronous or non-event_handler source handlers.

#define DC_FLAG_CONSUME   0x004ul
Copy the code

·DC_FLAG_BLOCK: Continuation function is a block.

#define DC_FLAG_BLOCK   0x010ul
Copy the code

DC_FLAG_ALLOCATED: bit used to ensure that dc_flags of allocated Continuations will never be 0.

#define DC_FLAG_ALLOCATED   0x100ul
Copy the code

Dispatch_qos_t is a Uint32_t type alias.

typedef uint32_t dispatch_qos_t;
Copy the code

_dispatch_continuation_init

The _dispatch_continuation_init function initializes the DC based on input parameters.

DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, dispatch_block_t work,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    // copy the block to the heap.
    void *ctxt = _dispatch_Block_copy(work);
    
    // input parameter dc_flags is DC_FLAG_CONSUME (0x004ul)
    // #define DC_FLAG_BLOCK 0x010ul
    // #define DC_FLAG_ALLOCATED 0x100ul
    
    // dc_flags = 0x114ul, merge the above three enumerations together
    dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
    
    // Check whether the work block function is equal to an enclosing function pointer
    if (unlikely(_dispatch_block_has_private_data(work))) {
        dc->dc_flags = dc_flags;
        dc->dc_ctxt = ctxt;
        // will initialize all fields but requires dc_flags & dc_ctxt to be set
        
        // Perform slow initialization of dispatch_continuation_s
        return _dispatch_continuation_init_slow(dc, dqu, flags);
    }

    // Block (dispatch_function_t)
    dispatch_function_t func = _dispatch_Block_invoke(work);
    
    // the default dc_flags parameter of dispatch_async is DC_FLAG_CONSUME
    if (dc_flags & DC_FLAG_CONSUME) {
        // _dispatch_call_block_and_RELEASE calls and releases blocks
        
        // _dispatch_call_block_and_release is work
        func = _dispatch_call_block_and_release;
    }
    
    // The dispatch_continuation_s structure variables are set to a range of member variables, including voucher and priority.
    
    // CTXT is a pointer to a copy of work in the heap
    // func is a function
    return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}
Copy the code

_dispatch_Block_copy

_dispatch_Block_copy Internally calls Block_copy to copy the stack block to the heap, or add one to the heap block reference.

void *
(_dispatch_Block_copy)(void *db)
{
    dispatch_block_t rval;

    if (likely(db)) {
        while (unlikely(! (rval =Block_copy(db)))) {
            // Ensure that the block is successfully replicated
            _dispatch_temporary_resource_shortage();
        }
        return rval;
    }
    DISPATCH_CLIENT_CRASH(0."NULL was passed where a block should have been");
}

DISPATCH_NOINLINE
void
_dispatch_temporary_resource_shortage(void)
{
    sleep(1);
    __asm__ __volatile__("");  // prevent tailcall
}
Copy the code

_dispatch_Block_invoke

If you are familiar with the construction inside a block, invoke is a function pointer to the function that the block is executing. (When we define a block, all expression contents expanded with {} form a complete function that the block executes, and the invoke pointer in the block structure points to that function.)

_dispatch_Block_invoke is a macro definition that retrieves the invoke member variable in the block structure.

typedef void(*BlockInvokeFunction)(void*,...).;
struct Block_layout {.// Function pointer to the function to be executed by the block (that is, the expression in curly braces in the block definition)BlockInvokeFunction invoke; . };#define _dispatch_Block_invoke(bb) \
        ((dispatch_function_t)((struct Block_layout *)bb)->invoke)
Copy the code

_dispatch_block_has_private_data

Check whether the block’s invoke pointer points to _dispatch_block_special_invoke (an external function pointer).

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_block_has_private_data(const dispatch_block_t block)
{
    return (_dispatch_Block_invoke(block) == _dispatch_block_special_invoke);
}
Copy the code

_dispatch_block_special_invoke

An external function pointer.

extern "C" {
// The compiler hides the name of the function it generates, and changes it if we try to reference it directly, but the linker still sees it.
extern void DISPATCH_BLOCK_SPECIAL_INVOKE(void *)
        __asm__(OS_STRINGIFY(__USER_LABEL_PREFIX__) "___dispatch_block_create_block_invoke");
void (*const _dispatch_block_special_invoke)(void*) = DISPATCH_BLOCK_SPECIAL_INVOKE;
}
Copy the code

_dispatch_call_block_and_release

Execute a block and release the block.

void
_dispatch_call_block_and_release(void *block)
{
    void (^b)(void) = block;
    b(a);/ / block is carried out
    Block_release(b); / / block release
}
Copy the code

_dispatch_continuation_init_f

Configure the member variables of the dispatch_continuation_s structure variable to assign the parameters passed to the DC, The _dispatch_continuation_voucher_set and _dispatch_continuation_priority_set functions are executed.

DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    pthread_priority_t pp = 0;
    dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED; // dc_flags add a new flag #define DC_FLAG_ALLOCATED 0x100ul
    dc->dc_func = f; // The function to be executed
    dc->dc_ctxt = ctxt;
    
    // in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority should not be propagated, only taken from the handler if it has one
    // DISPATCH_BLOCK_HAS_PRIORITY in this context means that priority should not be propagated, only if the handler has one
    if(! (flags & DISPATCH_BLOCK_HAS_PRIORITY)) { pp = _dispatch_priority_propagate(); }/ / configuration voucher
    _dispatch_continuation_voucher_set(dc, flags);
    / / configuration priority
    return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}
Copy the code

At this point the wrapping of the task (dispatch_continuation_s) is complete, and let’s look at the contents of the _dispatch_continuation_async function.

_dispatch_continuation_async

When dispatch_continuation_async is ready, call _dispatch_continuation_async(dq, dc, qos, dc->dc_flags).

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if(! (dc_flags & DC_FLAG_NO_INTROSPECTION)) { _dispatch_trace_item_push(dqu, dc); }#else
    (void)dc_flags;
#endif

    // Call the queue's dq_push function
    return dx_push(dqu._dq, dc, qos);
}

// dx_push is a macro definition
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

void (*const dq_push)(dispatch_queue_class_t.dispatch_object_t.dispatch_qos_t)
Copy the code

The _dispatch_continuation_async function uses a macro definition internally: Dx_push, the macro defines the dq_push of the vtable that calls dQU (dispatch_queue_class_t). .

Global search dQ_push found multiple different queues for assignment, For example, the root queue (.dq_push = _dispatch_root_queue_push), main queue (.dq_push = _dispatch_main_queue_push), and concurrent queue (.dq_push = _dispatch_LANe_concurrent_push), serial queues (.dq_push = _dispatch_lane_push), etc. Since our custom queues all target the root queue (where most tasks are executed), So we use _dispatch_root_queue_push as an example.

_dispatch_root_queue_push

The _dispatch_root_queue_push function is finally called inside the _dispatch_root_queue_PUSH_inline (Rq, dou, dou, 1) function.

DISPATCH_NOINLINE
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
        dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
    dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
    if (unlikely(ddi && ddi->ddi_can_stash)) {
        dispatch_object_t old_dou = ddi->ddi_stashed_dou;
        dispatch_priority_t rq_overcommit;
        rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;

        if (likely(! old_dou._do || rq_overcommit)) {dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
            dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
            ddi->ddi_stashed_rq = rq;
            ddi->ddi_stashed_dou = dou;
            ddi->ddi_stashed_qos = qos;
            _dispatch_debug("deferring item %p, rq %p, qos %d",
                    dou._do, rq, qos);
            if (rq_overcommit) {
                ddi->ddi_can_stash = false;
            }
            if (likely(! old_dou._do)) {return;
            }
            // push the previously stashed itemqos = old_qos; rq = old_rq; dou = old_dou; }}#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
    if (_dispatch_root_queue_push_needs_override(rq, qos)) {
        return _dispatch_root_queue_push_override(rq, dou, qos);
    }
#else
    (void)qos;
#endif
    // Call the inline function
    _dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
Copy the code

_dispatch_root_queue_push_inline

_dispatch_root_queue_PUSH_inline calls the _dispatch_root_queue_POKE function.

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
        dispatch_object_t _head, dispatch_object_t _tail, int n)
{
    struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
    if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
    
        // _dispatch_root_queue_poke
        return _dispatch_root_queue_poke(dq, n, 0); }}Copy the code

_dispatch_root_queue_poke

The _dispatch_root_queue_POKE function mainly does some fault tolerance and then calls the _dispatch_root_queue_POke_slow function.

DISPATCH_NOINLINE
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
    if(! _dispatch_queue_class_probe(dq)) {return;
    }
#if! DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
    if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
#endif
    {
        if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
            _dispatch_root_queue_debug("worker thread request still pending "
                    "for global queue: %p", dq);
            return; }}#endif / /! DISPATCH_USE_INTERNAL_WORKQUEUE

    // Above is some fault tolerance, mainly the _dispatch_root_queue_poke_slow function called here
    return _dispatch_root_queue_poke_slow(dq, n, floor);
}
Copy the code

_dispatch_root_queue_poke_slow

DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    // n = 1
    int remaining = n;
    int r = ENOSYS;

    // create a thread pool associated with the root queue (dispatch_once_f is called internally, globally only once)
    _dispatch_root_queues_init();
    // DEGBUG mode when the print __func__ function is executed
    _dispatch_debug_root_queue(dq, __func__);
    // hook
    _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);

#if! DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
    {
        _dispatch_root_queue_debug("requesting new worker thread for global "
                "queue: %p", dq);
        r = _pthread_workqueue_addthreads(remaining,
                _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
        (void)dispatch_assume_zero(r);
        return;
    }
#endif / /! DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
    dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
    if (likely(pqc->dpq_thread_mediator.do_vtable)) {
        while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
            _dispatch_root_queue_debug("signaled sleeping worker for "
                    "global queue: %p", dq);
            if(! --remaining) {return; }}}bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    if (overcommit) {
        os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
    } else {
        if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
            _dispatch_root_queue_debug("worker thread request still pending for "
                    "global queue: %p", dq);
            return; }}int can_request, t_count;
    // seq_cst with atomic store to tail <rdar://problem/16932833>
    
    // Get the thread pool size
    t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
    do {
        // Calculate the number of requests that can be made
        can_request = t_count < floor ? 0 : t_count - floor;
        if (remaining > can_request) {
            _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                    remaining, can_request);
            os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
            remaining = can_request;
        }
        
        if (remaining == 0) {
            // An error will be reported if the thread pool is unavailable
            _dispatch_root_queue_debug("pthread pool is full for root queue: "
                    "%p", dq);
            return; }}while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
            t_count - remaining, &t_count, acquire));

#if! defined(_WIN32)
    pthread_attr_t *attr = &pqc->dpq_thread_attr;
    pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (unlikely(dq == &_dispatch_mgr_root_queue)) {
        pthr = _dispatch_mgr_root_queue_init();
    }
#endif
    do {
        _dispatch_retain(dq); // released in _dispatch_worker_thread
        
        // create a thread pthread_create
        while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
            if(r ! = EAGAIN) { (void)dispatch_assume_zero(r); } _dispatch_temporary_resource_shortage(); }}while (--remaining);
#else // defined(_WIN32)
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (unlikely(dq == &_dispatch_mgr_root_queue)) {
        _dispatch_mgr_root_queue_init();
    }
#endif
    do {
        _dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
        unsigned dwStackSize = 0;
#else
        unsigned dwStackSize = 64 * 1024;
#endif
        uintptr_t hThread = 0;
        while(! (hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
            if(errno ! = EAGAIN) { (void)dispatch_assume(hThread);
            }
            _dispatch_temporary_resource_shortage();
        }
        if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
            (void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
        }
        CloseHandle((HANDLE)hThread);
    } while (--remaining);
#endif // defined(_WIN32)
#else
    (void)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
}
Copy the code

According to the code, the system will get the total number of thread pools and the number of threads that can be created, and then use two do while to dynamically open up threads.

The complete function call stack is roughly condensed as follows: Dispatch_async ➡️ _dispatch_continuation_init ➡️ _dispatch_continuation_async ➡️ dx_push ➡️ Dq_push (_dispatch_root_queue_push) ➡️ _dispatch_root_queue_PUSH_inline ➡️ _dispatch_root_queue_poke ➡️ _dispatch_root_queue_poke_slow ➡ ️ pthread_create.

dispatch_sync

Here’s how to execute the dispatch_sync function.

DISPATCH_NOINLINE
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    // #define DC_FLAG_BLOCK 0x010ul
    uintptr_t dc_flags = DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
    
    // work block
    // _dispatch_Block_invoke(work) function
    _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
Copy the code

The dispatch_sync function internally invokes the _dispatch_sync_f function.

_dispatch_sync_f

Only one line of _dispatch_sync_f directly calls the _dispatch_sync_F_inline function.

DISPATCH_NOINLINE
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags)
{
    _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
Copy the code

_dispatch_sync_f_inline

In the _dispatch_sync_f_inline function, the opening is a dq->dq_width == 1. The dQ_width value of the serial queue is 1, and the dq_width value of the custom concurrent queue is 0xFFeull. The dq_width value of the root queue is 0xFFFULL, that is, _dispatch_barrier_SYNc_f (Dq, CTXT, func, dc_FLAGS) will be executed if the DQ parameter is a serial queue, and if the DQ parameter is a concurrent queue, The following function is executed.

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    // Serial queue dq_width = 1
    if (likely(dq->dq_width == 1)) {
        return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
    }
    
    // _DISPATCH_LANE_TYPE = 0x00000011, // meta-type for lanes
    // _DISPATCH_META_TYPE_MASK = 0x000000ff, // mask for object meta-types
    // #define dx_metatype(x) (dx_vtable(x)->do_type & _DISPATCH_META_TYPE_MASK)
    
    if (unlikely(dx_metatype(dq) ! = _DISPATCH_LANE_TYPE)) {DISPATCH_CLIENT_CRASH(0."Queue type doesn't support dispatch_sync");
    }
    
    dispatch_lane_t dl = upcast(dq)._dl;
    // Global concurrent queues and queues bound to non-dispatch threads always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    // Global concurrent queues and queues bound to non-dispatch threads are always slow cases
    if (unlikely(! _dispatch_queue_try_reserve_sync_width(dl))) {return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
    }

    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    
    _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
            _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
Copy the code

upcast

The input is converted to the dispatch_object_t type.

DISPATCH_ALWAYS_INLINE
static inline dispatch_object_t
upcast(dispatch_object_t dou)
{
    return dou;
}
Copy the code

_dispatch_barrier_sync_f

Within the _dispatch_barrier_SYNc_f function, only _dispatch_barrier_sync_f_inline is called.

DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
Copy the code
_dispatch_barrier_sync_f_inline
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    // Get the ID of the current thread (TLS is stored in thread storage)
    dispatch_tid tid = _dispatch_tid_self();

    if (unlikely(dx_metatype(dq) ! = _DISPATCH_LANE_TYPE)) {DISPATCH_CLIENT_CRASH(0."Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    // The more correct thing to do would be to merge the qos of the
    // thread that just acquired the barrier lock into the queue state.
    It is more correct to merge the mass of the thread that just acquired the barrier lock into the queue state.
    //
    // However this is too expensive for the fast path, so skip doing it.
    // The chosen tradeoff is that if an enqueue on a lower priority thread
    // contends with this fast path, this thread may receive a useless override.
    //
    // However, this is too expensive for fast paths, so skip this step.
    // The tradeoff chosen is that if a queue on a lower priority thread competes with this fast path, that thread may receive unwanted overrides.
    // 
    // Global concurrent queues and queues bound to non-dispatch threads always
    // fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    // Global concurrent queues and queues bound to unscheduled threads are always slow,
    
    / / a deadlock
    if (unlikely(! _dispatch_queue_try_acquire_barrier_sync(dl, tid))) {return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                DC_FLAG_BARRIER | dc_flags);
    }

    if (unlikely(dl->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func,
                DC_FLAG_BARRIER | dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    
    / / execution block
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
            DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                    dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
Copy the code

In everyday development we know that if we have a serial queue and then we call dispatch_sync to add a task to the serial queue, a deadlock must occur. The cause of the deadlock is stored in the _dispatch_queue_try_acquire_barrier_sync(DL, tid) function call.

When we write the following function in viewDidLoad:

dispatch_sync(mainQueue, ^{
    NSLog(@"✈ ️ ✈ ️ ✈ ️ crash." ");
});
Copy the code

0 __DISPATCH_WAIT_FOR_QUEUE__ ⬅️ 1 _dispatch_sync_f_slow ⬅️ 2 -[ViewController viewDidLoad]… The __DISPATCH_WAIT_FOR_QUEUE__ function crashed.

Let’s follow the _dispatch_queue_try_acquire_barrier_sync function to the call.

DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid)
{
    return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0);
}
Copy the code

The _dispatch_queue_try_acquire_barrier_sync_and_suspend function is called directly.

_dispatch_queue_try_acquire_barrier_sync_and_suspend

The _dispatch_queue_trY_acquire_barrier_SYNC_AND_SUSPEND function calls back to the OS_ATOMic_RMW_loop2O function to obtain status information from the underlying OS and return.

/* Used by _dispatch_barrier_{try,} sync * * Note, this fails if any of e:1 or dl! =0, but that allows this code to be * a simple cmpxchg which is significantly faster on Intel, and makes a * significant difference on the uncontended codepath. * * See discussion for DISPATCH_QUEUE_DIRTY in queue_internal.h * * Initial state must be `completely idle` * Final state forces { ib:1, qf:1, w:0 } */
 
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
        uint32_t tid, uint64_t suspend_count)
{
    // #define DISPATCH_QUEUE_STATE_INIT_VALUE(width) \ // ((DISPATCH_QUEUE_WIDTH_FULL - (width)) << DISPATCH_QUEUE_WIDTH_SHIFT)
    If dq->dq_width is 1, init = 0xffFull << 41
    
    uint64_t init  = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
    
    // #define DISPATCH_QUEUE_WIDTH_FULL_BIT 0x0020000000000000ull
    #define DISPATCH_QUEUE_IN_BARRIER 0x0040000000000000ULL #define DISPATCH_QUEUE_IN_BARRIER 0x0040000000000000ULL
    // #define DISPATCH_QUEUE_SUSPEND_INTERVAL 0x0400000000000000ull
    
    uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
            _dispatch_lock_value_from_tid(tid) |
            (suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
            
    uint64_t old_state, new_state;
    
    // #define DISPATCH_QUEUE_ROLE_MASK 0x0000003000000000ull
    // #define os_atomic_rmw_loop2o(p, f, ov, nv, m, ...) os_atomic_rmw_loop(&(p)->f, ov, nv, m, __VA_ARGS__)
    
    // Get information from the bottom layer -- state information -- current queue -- thread
    return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
        uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
        if(old_state ! = (init | role)) {os_atomic_rmw_loop_give_up(break);
        }
        new_state = value | role;
    });
}
Copy the code

The _dispatch_LOCK_VALUE_FROM_TID bit operation obtains dispatch_lock.

DISPATCH_ALWAYS_INLINE
static inline dispatch_lock
_dispatch_lock_value_from_tid(dispatch_tid tid)
{
    return tid & DLOCK_OWNER_MASK;
}
Copy the code

In the _dispatch_barrier_sync_f_inline function, let’s look at the contents of the _dispatch_sync_F_slow function if _dispatch_sync_F_slow is executed.

_dispatch_sync_f_slow

__DISPATCH_WAIT_FOR_QUEUE__(& DSC, DQ) is seen inside the _dispatch_sync_F_slow function.

The _dispatch_SYNC_F_slow function generates some task information, which is then pushed by _dispatch_trace_ITEM_push and stored in our synchronization queue (FIFO) to execute the function.

DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
        dispatch_function_t func, uintptr_t top_dc_flags,
        dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
    dispatch_queue_t top_dq = top_dqu._dq;
    dispatch_queue_t dq = dqu._dq;
    if (unlikely(! dq->do_targetq)) {return _dispatch_sync_function_invoke(dq, ctxt, func);
    }

    pthread_priority_t pp = _dispatch_get_priority();
    
    // Task information
    struct dispatch_sync_context_s dsc = {
        .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
        .dc_func     = _dispatch_async_and_wait_invoke,
        .dc_ctxt     = &dsc,
        .dc_other    = top_dq,
        .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
        .dc_voucher  = _voucher_get(),
        .dsc_func    = func,
        .dsc_ctxt    = ctxt,
        .dsc_waiter  = _dispatch_tid_self(),
    };
    
    / / pressure stack
    _dispatch_trace_item_push(top_dq, &dsc);
    
    / / ⬇ ️ ⬇ ️ ⬇ ️
    __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

    if (dsc.dsc_func == NULL) {
        // dsc_func being cleared means that the block ran on another thread ie.
        // case (2) as listed in _dispatch_async_and_wait_f_slow.
        dispatch_queue_t stop_dq = dsc.dc_other;
        return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
    }

    _dispatch_introspection_sync_begin(top_dq);
    _dispatch_trace_item_pop(top_dq, &dsc);
    _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
            DISPATCH_TRACE_ARG(&dsc));
}
Copy the code
__DISPATCH_WAIT_FOR_QUEUE__

The __DISPATCH_WAIT_FOR_QUEUE__ function gets the queue state to see if it is waiting and calls the XOR operation in _dq_state_DRAIN_locked_BY to determine the queue and thread waiting state. If both are waiting, YES is returned, causing a deadlock crash.

DISPATCH_NOINLINE
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
    // Get the status of the queue to see if it is in wait state
    uint64_t dq_state = _dispatch_wait_prepare(dq);
    
    if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
        DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                "dispatch_sync called on queue "
                "already owned by current thread"); }... }Copy the code
_dq_state_drain_locked_by
DISPATCH_ALWAYS_INLINE
static inline bool
_dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
{
    return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
}
Copy the code
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
    // #define DISPATCH_QUEUE_DRAIN_OWNER_MASK ((uint64_t)DLOCK_OWNER_MASK)
    // equivalent to _dispatch_lock_owner(lock_value) == tid
    
    // lock_value indicates the queue status and tid indicates the thread ID
    // ^ (xor) if two are the same, 0 will appear otherwise 1
    return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
Copy the code

_dispatch_sync first gets the TID of the current thread, and then the status returned by the underlying system. Then, it compares the waiting status of the queue with the TID. If the status is the same, it indicates that the queue is being deadlocked and crashes.

Seeing that we have found the cause of the serial queue deadlock, and how the dispatch_sync function execution is normally performed, let’s go back to the previous flow to continue.

In the _dispatch_barrier_sync_f_inline function, the _dispatch_lane_barrier_sync_INVOke_AND_complete function is normally called.

_dispatch_lane_barrier_sync_invoke_and_complete

/* * For queues we can cheat and inline the unlock code, * which is invalid for objects with a more complex state machine (sources or mach channels) */
DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
        void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void*dc)) { _dispatch_sync_function_invoke_inline(dq, ctxt, func); . }Copy the code

_dispatch_sync_function_invoke_inline

The _dispatch_sync_function_INVOke_inline function inside the _dispatch_client_callout function performs the task.

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_thread_frame_s dtf;
    _dispatch_thread_frame_push(&dtf, dq);
    
    // func(ctxt)
    _dispatch_client_callout(ctxt, func);
    
    _dispatch_perfmon_workitem_inc();
    _dispatch_thread_frame_pop(&dtf);
}
Copy the code

_dispatch_client_callout


DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    return f(ctxt); / / ⬅ ️
}

DISPATCH_NOINLINE
void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    _dispatch_get_tsd_base();
    void *u = _dispatch_get_unwind_tsd();
    if (likely(! u))return f(ctxt);
    _dispatch_set_unwind_tsd(NULL);
    f(ctxt); / / ⬅ ️
    _dispatch_free_unwind_tsd();
    _dispatch_set_unwind_tsd(u);
}

void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    @try {
        return f(ctxt); / / ⬅ ️
    }
    @catch(...). {objc_terminate();
    }
}
Copy the code

See here that the block of dispatch_sync is executed.

When dispatch_sync submits tasks to a serial queue, the complete function call stack is condensed as follows: Dispatch_sync ➡️ _dispatch_sync_f ➡️ _dispatch_sync_F_inline ➡️ _dispatch_barrier_sync_f ➡️ _dispatch_barrier_sync_f_inline ➡ ️ _dispatch_lane_barrier_sync_invoke_and_complete ➡ ️ _dispatch_sync_function_invoke_inline ➡️ _dispatch_client_callout ➡️ f(CTXT).

When dispatch_sync submits tasks to concurrent queues, the complete function call stack is condensed as follows: Dispatch_sync ➡️ _dispatch_sync_f ➡️ _dispatch_sync_F_inline ➡️ _dispatch_sync_INVOke_AND_complete ➡️ _dispatch_sync_function_invoke_inline ➡️ _dispatch_client_callout ➡️ f(CTXT).

Refer to the link

Reference link :🔗

  • Libdispatch Apple source code
  • GCD source code analysis 1 — the opening chapter
  • Dig up the libDispatch source code
  • GCD source code analysis
  • A few things about GCD development
  • GCD in-depth understanding: Part ONE
  • Dispatch_once,
  • Transparent federation type
  • Perversion of libDispatch -dispatch_object_s
  • The basics of GCD
  • Swift multithreading from the source analysis – DispatchGroup
  • GCD source code analysis (a)
  • CD- source code analysis
  • GCD underlying source code analysis
  • (1) — GCD Queue
  • C/C ++: computes variable parameter macrosVA_ARGSNumber of arguments of
  • OC Low-level exploration (21)GCD asynchrony, GCD synchronization, singleton, semaphore, scheduling group, fence function and other low-level analysis