This is the 18th day of my participation in the August Genwen Challenge.More challenges in August

1. Singleton mode

Singletons are used in GCD

static dispatch_once_t onceToken; Dispatch_once (&onceToken, ^{NSLog(@"block-- : %@",[NSThread currentThread]); });Copy the code

Go to the source code and find the implementation of the dispatch_once function

void dispatch_once(dispatch_once_t *val, dispatch_block_t block) {
    dispatch_once_f(val, block, _dispatch_Block_invoke(block)); 
}
Copy the code

The dispatch_once_f function is displayed

void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) { dispatch_once_gate_t l = (dispatch_once_gate_t)val; #if ! DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER uintptr_t v = os_atomic_load(&l->dgo_once, acquire); if (likely(v == DLOCK_ONCE_DONE)) { return; } #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER if (likely(DISPATCH_ONCE_IS_GEN(v))) { return _dispatch_once_mark_done_if_quiesced(l, v); } #endif #endif if (_dispatch_once_gate_tryenter(l)) { return _dispatch_once_callout(l, ctxt, func); } return _dispatch_once_wait(l); }Copy the code
  • Forcing val to type dispatch_once_gate_t is similar to the use of a fence

Three conditional branches:

  • If the execution is complete, return directly
  • For the first time, execute the _dispatch_once_callout function
  • After executing, enter _dispatch_once_wait and wait

1.1 Lock processing

The _dispatch_once_gate_tryenter function is displayed

static inline bool _dispatch_once_gate_tryenter(dispatch_once_gate_t l) { 
    return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED, (uintptr_t)_dispatch_lock_value_for_self(), relaxed); 
}
Copy the code
  • Atomic lock processing to prevent multithreading

1.2 Performing Tasks

The _dispatch_once_callout function is displayed

static void _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt, dispatch_function_t func) { _dispatch_client_callout(ctxt, func); _dispatch_once_gate_broadcast(l); } void _dispatch_client_callout(void *ctxt, dispatch_function_t f) { _dispatch_get_tsd_base(); void *u = _dispatch_get_unwind_tsd(); if (likely(! u)) return f(ctxt); _dispatch_set_unwind_tsd(NULL); f(ctxt); _dispatch_free_unwind_tsd(); _dispatch_set_unwind_tsd(u); }Copy the code
  • Perform the callback of the task through F (CTXT)

The _dispatch_once_gate_broadcast function is displayed

static inline void _dispatch_once_gate_broadcast(dispatch_once_gate_t l) { 
    dispatch_lock value_self = _dispatch_lock_value_for_self(); 
    uintptr_t v; 
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER 
    v = _dispatch_once_mark_quiescing(l); 
#else 
    v = _dispatch_once_mark_done(l); 
#endif 
    if (likely((dispatch_lock)v == value_self)) return; 
    _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
Copy the code
  • Lock processing and mark as done

The principle of singleton pattern:

  • calldispatch_onceFunction, passed inonceTokenandblock. Among themonceTokenIs a static variable and hasuniqueness, is strongly converted at the bottomdispatch_once_gate_tType variablel.lthroughos_atomic_loadFunction retrieves the relation of the encapsulation of the underlying atom, yielding the variablevV is used to query the task status. If v is equal toBLOCK_ONCE_DONEThe task has already been handled once, directlyreturn
  • If the task is executed for the first time, lock the task and set its status toDLOCK_ONCE_UNLOCKTo ensure thread safety. After the block callback is executed, the current task is unlocked and the status of the current task is set toDLOCK_ONCE_DONEThe next time it comes in, it’s not going to be executed, it’s going to be straightreturn
  • If other tasks enter the system during the execution of the current task, the system will enter an infinite number of waits. The reason is that the current task has obtained the lock and is locked, and other tasks cannot obtain the lock

2. The thread pool

2.1 Creating a Thread

Asynchronous functions in _dispatch_root_queue_POke_slow, if global queues, are created and executed using the _pthread_workqueue_addThreads function

#if ! DISPATCH_USE_INTERNAL_WORKQUEUE #if DISPATCH_USE_PTHREAD_ROOT_QUEUES if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)  #endif { _dispatch_root_queue_debug("requesting new worker thread for global " "queue: %p", dq); r = _pthread_workqueue_addthreads(remaining, _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority)); (void)dispatch_assume_zero(r); return; } #endif // ! DISPATCH_USE_INTERNAL_WORKQUEUECopy the code

If it is a normal queue, use do… While creates the thread pool. Before creating the pool, it also evaluates the state of the thread pool

int can_request, t_count; // seq_cst with atomic store to tail <rdar://problem/16932833> t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered); do { can_request = t_count < floor ? 0 : t_count - floor; if (remaining > can_request) { _dispatch_root_queue_debug("pthread pool reducing request from %d to %d", remaining, can_request); os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed); remaining = can_request; } if (remaining == 0) { _dispatch_root_queue_debug("pthread pool is full for root queue: " "%p", dq); return; } } while (! os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count, t_count - remaining, &t_count, acquire));Copy the code
  • judgedgq_thread_pool_size, marked as1
  • dgq_thread_pool_sizeIt increments logically up to the maximum
  • remainingandfloor1 and 0 are passed as input parameters
  • To calculatecan_requestNumber of threads, ift_countLess thanfloorReturns 0, otherwise returnst_countMinus thefloorThe difference between the
  • ifremainingThe number of threads is greater thancan_request.pthreadThread pools reduce requests tocan_requestNumber of threads
  • ifremainingIs 0, indicating that of the root queuepthreadThe thread pool is full

Create a thread using the pthread_creat function

do { _dispatch_retain(dq); // released in _dispatch_worker_thread while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) { if (r ! = EAGAIN) { (void)dispatch_assume_zero(r); } _dispatch_temporary_resource_shortage(); } } while (--remaining);Copy the code

2.2 Maximum number of threads

Set the maximum number of threads in the thread pool

int thread_pool_size = DISPATCH_WORKQ_MAX_PTHREAD_COUNT; 
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
Copy the code
  • The maximum number of threads is set to 255, but the actual number of threads in the program may not reach this maximum

In the official documentation, the helper thread is 512KB, and the minimum stack size allowed by the helper thread is 16KB, and the stack size must be a multiple of 4KB. When the program starts, the system provides 4GB of virtual memory, 3GB for user mode, and 1GB for kernel mode. However, not all of the 1GB of kernel state can be used to open up threads, so the maximum number of threads is unknown

do { _dispatch_retain(dq); // released in _dispatch_worker_thread #if DISPATCH_DEBUG unsigned dwStackSize = 0; #else unsigned dwStackSize = 64 * 1024; #endif uintptr_t hThread = 0; while (! (hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) { if (errno ! = EAGAIN) { (void)dispatch_assume(hThread); } _dispatch_temporary_resource_shortage(); } #if DISPATCH_USE_PTHREAD_ROOT_QUEUES if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) { (void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE); } #endif CloseHandle((HANDLE)hThread); } while (--remaining);Copy the code
  • In kernel state1GBFull load, minimum stack size is16KBCalculate the maximum number of threads available64 * 1024. By helper thread512KBCalculate the maximum number of threads available2048

3. Fence function

There are two types of barrier functions in iOS, both used to control the order in which tasks are executed

  • Dispatch_barrier_async: asynchronous barrier function that is sent only after the previous task has been completed
  • Dispatch_barrier_sync: the synchronous fence function has the same effect as the asynchronous fence function, but the synchronous fence function blocks the thread and affects subsequent tasks

Considerations for using the fence function:

  • The fence function can only control the same concurrent queue
  • The current thread is locked until the previous task of the fence function and the task of the fence itself have finished executing
  • Global concurrent queues do not support fence functions because they may interfere with system-level task execution
  • If it is a serial queue, using the fence function is equivalent to a synchronization function and makes no sense

The fence function can also be used for thread safety, similar to locking

dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT); for (int i = 0; i<10000; i++) { dispatch_async(concurrentQueue, ^{ dispatch_barrier_async(concurrentQueue, ^{ [self.mArray addObject:[NSString stringWithFormat:@"%d",i]]; }); // @synchronized (self) { // [self.mArray addObject:[NSString stringWithFormat:@"%d",i]]; / /}}); }Copy the code
  • This example, without fencing or mutex, uses concurrent queue threads on the same arrayaddObjectA crash is likely
  • Because data is written to, essentially, old valuesrelease, for the new valueretain
  • When the data keeps comingreleaseandretainWhen multithreading will cause data not yetretainOver, startrelease, equivalent to adding empty data, proceedrelease

3.1 Analysis of synchronous fence function

In the source, find the implementation of the dispatch_barrier_sync function

void dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work) { 
    uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK; 
    if (unlikely(_dispatch_block_has_private_data(work))) { 
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags); 
    } 
    _dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags); 
}}
Copy the code

_dispatch_barrier_sync_f –> _dispatch_barrier_sync_f_inline

static inline void _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { dispatch_tid tid = _dispatch_tid_self(); if (unlikely(dx_metatype(dq) ! = _DISPATCH_LANE_TYPE)) { DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync"); } dispatch_lane_t dl = upcast(dq)._dl; // The more correct thing to do would be to merge the qos of the thread // that just acquired the barrier lock into the queue state. // // However this is too expensive for the fast path, so skip doing it. // The chosen tradeoff is that if an enqueue on a lower priority thread // contends with this fast path, this thread may receive a useless override. // // Global concurrent queues and queues bound to non-dispatch threads // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE if (unlikely(! _dispatch_queue_try_acquire_barrier_sync(dl, tid))) { return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl, DC_FLAG_BARRIER | dc_flags); } if (unlikely(dl->do_targetq->do_targetq)) { return _dispatch_sync_recurse(dl, ctxt, func, DC_FLAG_BARRIER | dc_flags); } _dispatch_introspection_sync_begin(dl); _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop( dq, ctxt, func, dc_flags | DC_FLAG_BARRIER))); }Copy the code
  • There is entry in logic_dispatch_sync_f_slowFunction code, to prove that synchronization fence function can also appear deadlock

_dispatch_sync_recurse – _dispatch_sync_invoke_and_complete_recurse – _dispatch_sync_complete_recurse

static void _dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq, uintptr_t dc_flags) { 
bool barrier = (dc_flags & DC_FLAG_BARRIER); 
do { 
    if (dq == stop_dq) return; 
    if (barrier) { 
        dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE); 
    } else {
        _dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0); 
    } 
    dq = dq->do_targetq; 
    barrier = (dq->dq_width == 1); 
} while (unlikely(dq->do_targetq)); }
Copy the code
  • judgetargetqThere is a fence calldx_wakeupWaiting for the
  • Otherwise, call_dispatch_lane_non_barrier_completefunction

Dx_wakeup implementation of concurrent queues

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane, 
    .do_type = DISPATCH_QUEUE_CONCURRENT_TYPE, 
    .do_dispose = _dispatch_lane_dispose, 
    .do_debug = _dispatch_queue_debug, 
    .do_invoke = _dispatch_lane_invoke, 
    .dq_activate = _dispatch_lane_activate, 
    .dq_wakeup = _dispatch_lane_wakeup, 
    .dq_push = _dispatch_lane_concurrent_push,
);
Copy the code

Enter the _dispatch_lane_wakeup function

void _dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos, dispatch_wakeup_flags_t flags) { 
    dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE; 
    if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) { 
        return _dispatch_lane_barrier_complete(dqu, qos, flags); 
    } 
    if (_dispatch_queue_class_probe(dqu)) { 
        target = DISPATCH_QUEUE_WAKEUP_TARGET; 
    } 
    return _dispatch_queue_wakeup(dqu, qos, flags, target); 
}
Copy the code
  • Judge the fence function and enter_dispatch_lane_barrier_completefunction

Enter the _dispatch_lane_barrier_complete function

static void _dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos, dispatch_wakeup_flags_t flags) { dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE; dispatch_lane_t dq = dqu._dl; if (dq->dq_items_tail && ! DISPATCH_QUEUE_IS_SUSPENDED(dq)) { struct dispatch_object_s *dc = _dispatch_queue_get_head(dq); if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) { if (_dispatch_object_is_waiter(dc)) { return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0); } } else if (dq->dq_width > 1 && ! _dispatch_object_is_barrier(dc)) { return _dispatch_lane_drain_non_barriers(dq, dc, flags); } if (! (flags & DISPATCH_WAKEUP_CONSUME_2)) { _dispatch_retain_2(dq); flags |= DISPATCH_WAKEUP_CONSUME_2; } target = DISPATCH_QUEUE_WAKEUP_TARGET; } uint64_t owned = DISPATCH_QUEUE_IN_BARRIER + dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL; return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned); }Copy the code
  • If it is a serial queue, the fence acts as a synchronization function, called_dispatch_lane_drain_barrier_waiterfunction
  • If it is a concurrent queue, call_dispatch_lane_drain_non_barriersFunction for fence related processing
  • All tasks before the fence are complete, call_dispatch_lane_class_barrier_completefunction

3.2 Fence functions in global queues

Dx_wakeup implementation of global queues

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane, 
    .do_type         = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE, 
    .do_dispose      = _dispatch_object_no_dispose, 
    .do_debug        = _dispatch_queue_debug, 
    .do_invoke       = _dispatch_object_no_invoke, 
    .dq_activate     = _dispatch_queue_no_activate, 
    .dq_wakeup       = _dispatch_root_queue_wakeup, 
    .dq_push         = _dispatch_root_queue_push, 
);
Copy the code

Enter the _dispatch_root_queue_wakeup function

void _dispatch_root_queue_wakeup(dispatch_queue_global_t dq, DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags) {
    if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) { 
        DISPATCH_INTERNAL_CRASH(dq->dq_priority, "Don't try to wake up or override a root queue"); 
    } 
    if (flags & DISPATCH_WAKEUP_CONSUME_2) { 
        return _dispatch_release_2_tailcall(dq); 
    } 
}
Copy the code
  • There is no judgment or processing of the fence function in the global queue. Therefore, the fence function in the global queue is no different from the normal synchronous or asynchronous function

4. The semaphore

Semaphores allow asynchronous tasks to be executed synchronously, can be used as locks, and can control the maximum number of concurrent GCDS

dispatch_semaphore_t sem = dispatch_semaphore_create(0); for (int i = 0; i < 10; i++) { dispatch_async(queue, ^{ sleep(1); NSLog(@" current - %d, thread - %@", I, [NSThread currentThread]); dispatch_semaphore_signal(sem); }); dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); }Copy the code

-dispatch_semaphoRE_creat: dispatch_semaphore_wait: dispatch_semaphore_wait: dispatch_semaphore_wait: dispatch_semaphore_signal: Release, add 1 to the semaphore, equivalent to unlock

4.1 create

The dispatch_semaphore_CREat function is displayed

dispatch_semaphore_t dispatch_semaphore_create(intptr_t value) { 
    dispatch_semaphore_t dsema; 
    // If the internal value is negative, then the absolute of the value is 
    // equal to the number of waiting threads. Therefore it is bogus to 
    // initialize the semaphore with a negative value. 
    if (value < 0) { 
        return DISPATCH_BAD_INPUT; 
    } 
    dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s)); 
    dsema->do_next = DISPATCH_OBJECT_LISTLESS; 
    dsema->do_targetq = _dispatch_get_default_queue(false); 
    dsema->dsema_value = value; 
    _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); 
    dsema->dsema_orig = value; 
    return dsema; 
}
Copy the code
  • Initialize the semaphore, set the maximum number of concurrent GCD
  • The maximum number of concurrent requests must be >= 0

4.2 waiting for

Enter the dispatch_semaphore_wait function

intptr_t dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) { 
    long value = os_atomic_dec2o(dsema, dsema_value, acquire); 
    if (likely(value >= 0)) { 
        return 0; 
    } 
    return _dispatch_semaphore_wait_slow(dsema, timeout); 
}
Copy the code
  • os_atomic_dec2oMacro to subtract 1
  • If the semaphore> = 0, returns 0, and executeswaitNext code
  • If the semaphore< 0, will block the current state, enter_dispatch_semaphore_wait_slowfunction

Enter the _dispatch_semaphore_wait_slow function

static intptr_t _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long orig; _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); switch (timeout) { default: if (! _dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) { break; } // Fall through and try to undo what the fast path did to // dsema->dsema_value case DISPATCH_TIME_NOW: orig = dsema->dsema_value; while (orig < 0) { if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1, &orig, relaxed)) { return _DSEMA4_TIMEOUT(); } } // Another thread called semaphore_signal(). // Fall through and drain the wakeup. case DISPATCH_TIME_FOREVER: _dispatch_sema4_wait(&dsema->dsema_sema); break; } return 0; }Copy the code
  • According to thetimeoutValue for different logical processing

If it is of type DISPATCH_TIME_FOREVER, enter the _dispatch_sema4_wait function

void _dispatch_sema4_wait(_dispatch_sema4_t *sema) { 
    int ret = 0; 
    do { 
        ret = sem_wait(sema); 
    } while (ret == -1 && errno == EINTR); 
    DISPATCH_SEMAPHORE_VERIFY_RET(ret); 
}
Copy the code
  • The core codedo... whileThe following code cannot be executed through a loop

4.3 release

The dispatch_semaphore_signal function is displayed

intptr_t dispatch_semaphore_signal(dispatch_semaphore_t dsema) { 
    long value = os_atomic_inc2o(dsema, dsema_value, release); 
    if (likely(value > 0)) {
        return 0; 
    } 
    if (unlikely(value == LONG_MIN)) { 
        DISPATCH_CLIENT_CRASH(value, "Unbalanced call to dispatch_semaphore_signal()"); 
    } 
    return _dispatch_semaphore_signal_slow(dsema); 
}
Copy the code
  • os_atomic_inc2oMacro to add one
  • If the semaphore > 0, returns 0 and continues with the rest of the code
  • If the semaphore is equal toLONG_MINThrow an exception. In this casewaitToo many operations fail to match the two. And then it calls_dispatch_semaphore_signal_slowFunction to enter deferred wait.