Multithreaded GCD GCD source code
This chapter mainly introduces the underlying source code of GCD
- Exploration of the underlying principle of GCD fence function
- Exploration of underlying principles of GCD semaphore
- Exploration of underlying principles of GCD scheduling group
- GCD dispatch_source
Prepare to work with GCD source download
1. Exploration of the underlying principle of GCD fence function
The most direct function of the fence function is to control the sequence of tasks to be executed synchronously. There are two main types of fence functions commonly used in GCD
- The synchronous fence function dispatch_barrier_sync (executed on the main thread) : this is where the previous task has completed, but the synchronous fence function blocks the thread and affects subsequent tasks
- Asynchronous barrier function dispatch_barrier_async: this function will be used only after the previous task has completed
The fence function
- The fence function can only control the same concurrent queue
- When a synchronization barrier is added to the queue, the current thread is locked until the tasks before the barrier and the synchronization barrier itself have finished executing, the current thread will open and continue to execute the next line of code.
- When using the fence function. It makes sense to use a custom queue. If you are using a serial queue or system-provided global concurrent queue, the fence function is equivalent to a synchronous function
For example, there are four tasks in total. The first two tasks have a dependency, that is, task 1 is finished, and task 2 is executed. In this case, you can use the fence function
- Asynchronous fence functions do not block the main thread; asynchronous fence functions block the queue
dispatch_queue_t concurrentQueue = dispatch_queue_create("com.ypy.barrier", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(concurrentQueue, ^{
NSLog(@"task 1");
});
dispatch_barrier_async(concurrentQueue, ^{
NSLog(@"dispatch_barrier_async task 2");
});
dispatch_async(concurrentQueue, ^{
NSLog(@"task 3");
});
NSLog(@"task 4");
Copy the code
[34852:1997051] Task 4 2021-08-04 15:54:36.465721+0800 001-- Function and queue [34852:1997229] task 1 2021-08-04 15:54:36.465834+0800 001-- Function and queue [34852:1997229] dispatch_barrier_async Task 2 2021-08-04 15:54:36.465937+0800 001-- Function and queue [34852:1997229] Task 3Copy the code
The sync fence function blocks the main thread. The sync fence is the current thread
dispatch_queue_t concurrentQueue = dispatch_queue_create("com.ypy.barrier", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(concurrentQueue, ^{
NSLog(@"task 1");
});
dispatch_barrier_sync(concurrentQueue, ^{
NSLog(@"dispatch_barrier_sync task 2");
});
dispatch_async(concurrentQueue, ^{
NSLog(@"task 3");
});
NSLog(@"task 4");
Copy the code
2021-08-04 15:57:38.096087+0800 001-- Function and queue [34897:2000657] Task 1 001-- Function and queue [34897:2000511] dispatch_barrier_sync Task 2 2021-08-04 15:57:38.096253+0800 001-- Function and queue [34897:2000511] task 4 2021-08-04 15:57:38.096269+0800 001-- Function and queue [34897:2000657] Task 3Copy the code
Summary of asynchronous fence function block is the queue, and must be concurrent queue, custom task does not affect the main thread synchronization barrier function blocks is thread, and is the main thread, other tasks will affect the main thread of execution usage scenarios barrier function in addition to task dependencies, at the same time can also be used for data security like this operation, will collapse
NSMutableArray *array = [[NSMutableArray alloc] init];
dispatch_queue_t concurrentQueue = dispatch_queue_create("com.ypy.barrier", DISPATCH_QUEUE_CONCURRENT);
for (NSInteger index = 0; index < 1000; index++) {
dispatch_async(concurrentQueue, ^{
[array addObject:[NSString stringWithFormat:@"%ld",(long)index]];
});
}
Copy the code
To solve the collapse
- By adding the fence function
NSMutableArray *array = [[NSMutableArray alloc] init];
dispatch_queue_t concurrentQueue = dispatch_queue_create("com.ypy.barrier", DISPATCH_QUEUE_CONCURRENT);
for (NSInteger index = 0; index < 1000; index++) {
dispatch_async(concurrentQueue, ^{
dispatch_barrier_async(concurrentQueue, ^{
[array addObject:[NSString stringWithFormat:@"%ld",(long)index]];
});
});
}
Copy the code
- @synchronized (self) {}
NSMutableArray *array = [[NSMutableArray alloc] init];
dispatch_queue_t concurrentQueue = dispatch_queue_create("com.ypy.barrier", DISPATCH_QUEUE_CONCURRENT);
for (NSInteger index = 0; index < 1000; index++) {
dispatch_async(concurrentQueue, ^{
@synchronized (self) {
[array addObject:[NSString stringWithFormat:@"%ld",(long)index]];
};
});
}
Copy the code
Pay attention to
- If you use a global queue in a fence function, the operation will crash because the system is also using a global concurrent queue, and the fence will intercept the system at the same time, so it will crash
- If you change your custom concurrent queue to a serial queue, which is itself sequential synchronization, you’re wasting performance by putting a fence on it
- The fence function blocks only once
1.1 dispatch_barrier_async Underlying analysis
#ifdef __BLOCKS__
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc_flags);
}
#endif
Copy the code
Enter dispatch_barrier_Async source code implementation, its implementation and dispatch_async similar, here no longer do analysis, interested can explore below
1.2 dispatch_barrier_sync Underlying analysis
Enter the source code of dispatch_barrier_sync, the implementation is as follows
void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
Copy the code
DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
Copy the code
1.2.1 _dispatch_barrier_sync_f_inline
Enter the _dispatch_barrier_sync_f -> _dispatch_barrier_sync_F_inline source code
DISPATCH_ALWAYS_INLINE static inline void _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { dispatch_tid tid = _dispatch_tid_self(); // Get the thread id, which is the thread's unique identifier... If (unlikely(!)); _dispatch_queue_try_acquire_barrier_sync(dl, tid)) {return _dispatch_sync_f_slow(dl, CTXT, func, DC_FLAG_BARRIER, dl, / / not recycling DC_FLAG_BARRIER | dc_flags); } // Verify that the target exists. If it does, If (unlikely(dl->do_targetq-> do_targetQ)) {return _dispatch_sync_recurse(dl, CTXT, func, DC_FLAG_BARRIER | dc_flags); } _dispatch_introspection_sync_begin(dl); _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop( dq, ctxt, func, dc_flags | DC_FLAG_BARRIER))); / / execution}Copy the code
The source code is mainly divided into the following parts
- Get the thread ID from _dispatch_tid_self
- Use _dispatch_queue_try_acquire_barrier_sync to determine the thread status
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid)
{
return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0);
}
Copy the code
- Go to _dispatch_queue_try_acquire_barrier_sync_and_SUSPEND and do the release here
/* Used by _dispatch_barrier_{try,}sync * * Note, this fails if any of e:1 or dl! =0, but that allows this code to be a * simple cmpxchg which is significantly faster on Intel, and makes a * significant difference on the uncontended codepath. * * See discussion for DISPATCH_QUEUE_DIRTY in queue_internal.h * * Initial state must be `completely idle` * Final state forces { ib:1, qf:1, w:0 } */ DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT static inline bool _dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq, uint32_t tid, uint64_t suspend_count) { uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width); uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER | _dispatch_lock_value_from_tid(tid) | (suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL); uint64_t old_state, new_state; return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, { uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK; if (old_state ! = (init | role)) { os_atomic_rmw_loop_give_up(break); / / release first} new_state = value | role; }); }Copy the code
- Find the target of the fence function recursively with _dispatch_sync_recurse
DISPATCH_NOINLINE static void _dispatch_sync_recurse(dispatch_lane_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { dispatch_tid tid = _dispatch_tid_self(); dispatch_queue_t tq = dq->do_targetq; do { if (likely(tq->dq_width == 1)) { if (unlikely(! _dispatch_queue_try_acquire_barrier_sync(tq, tid))) { return _dispatch_sync_f_slow(dq, ctxt, func, dc_flags, tq, DC_FLAG_BARRIER); } } else { dispatch_queue_concurrent_t dl = upcast(tq)._dl; if (unlikely(! _dispatch_queue_try_reserve_sync_width(dl))) { return _dispatch_sync_f_slow(dq, ctxt, func, dc_flags, tq, 0); } } tq = tq->do_targetq; } while (unlikely(tq->do_targetq)); _dispatch_introspection_sync_begin(dq); _dispatch_sync_invoke_and_complete_recurse(dq, ctxt, func, dc_flags DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop( dq, ctxt, func, dc_flags))); }Copy the code
- Forward information is processed with _dispatch_introspection_sync_begin
DISPATCH_ALWAYS_INLINE static inline void _dispatch_introspection_sync_begin(dispatch_queue_class_t dq) { if (! _dispatch_introspection.debug_queue_inversions) return; _dispatch_introspection_order_record(dq._dq); // Sort records}Copy the code
- The block is executed and released with _dispatch_lane_barrier_sync_invoke_and_complete
/* - For queues we can cheat and inline the unlock code, which is invalid - for objects with a more complex state machine (sources or mach channels) */ DISPATCH_NOINLINE static void _dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc)) { _dispatch_sync_function_invoke_inline(dq, ctxt, func); _dispatch_trace_item_complete(dc); if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) { return _dispatch_lane_barrier_complete(dq, 0, 0); } // Presence of any of these bits requires more work that only // _dispatch_*_barrier_complete() handles properly // // Note: testing for RECEIVED_OVERRIDE or RECEIVED_SYNC_WAIT without // checking the role is sloppy, but is a super fast check, and neither of // these bits should be set if the lock was never contended/discovered. const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK | DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY | DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER | DISPATCH_QUEUE_RECEIVED_SYNC_WAIT; uint64_t old_state, new_state; // similar to _dispatch_queue_drain_try_unlock os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, { new_state = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED; new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK; new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK; if (unlikely(old_state & fail_unlock_mask)) { os_atomic_rmw_loop_give_up({ return _dispatch_lane_barrier_complete(dq, 0, 0); // Tell the barrier that the execution is complete}); }}); if (_dq_state_is_base_wlh(old_state)) { _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq); }}Copy the code
Exploration of underlying principles of GCD semaphore
Semaphores are generally used to synchronize tasks, similar to a mutex. The user can control the maximum number of concurrent GCD as needed. This is generally used
Sem = dispatch_semaphore_create(1); dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); dispatch_semaphore_signal(sem);Copy the code
2.1 dispatch_semaphore_create create
The underlying implementation of this function is to initialize the semaphore and set the maximum concurrency of the GCD, which must be greater than 0
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
Copy the code
2.2 dispatch_semaphore_wait lock
The function is implemented as follows. The main function is the semaphore dsema by os_atomic_dec2o, which is executed internally by the C++ atomic_fetch_sub_explicit method
- If the value is greater than or equal to 0, the operation is invalid and the operation succeeds
- If value equals LONG_MIN, the system throws a crash
- If value is less than 0, the long wait is entered
long dispatch_semaphore_wait(dispatch_semaphore_t dsema, Long value = os_atomIC_DEC2O (dsema, dsema_value, acquire); If (likely(value >= 0)) {return 0; } return _dispatch_semaphore_wait_slow(dsema, timeout); // Wait a long time}Copy the code
The macro definition for OS_ATOMic_DEC2O is transformed as follows
#define os_atomic_dec2o(p, f, m) \
os_atomic_sub2o(p, f, 1, m)
#define os_atomic_sub2o(p, f, v, m) \
os_atomic_sub(&(p)->f, (v), m)
#define os_atomic_sub(p, v, m) \
_os_atomic_c11_op((p), (v), m, sub, -)
#define _os_atomic_c11_op(p, v, m, o, op) \
({ _os_atomic_basetypeof(p) _v = (v), _r = \
atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), _v, \
memory_order_##m); (__typeof__(_r))(_r op _v); })
Copy the code
So I’m going to substitute the actual value for
os_atomic_dec2o(dsema, dsema_value, acquire); os_atomic_sub2o(dsema, dsema_value, 1, m) os_atomic_sub(dsema->dsema_value, 1, m) _os_atomic_c11_op(dsema->dsema_value, 1, m, sub, -) _r = atomic_fetch_sub_explicit(dSEMa -> dSEMa_value, 1Copy the code
- Enter the source implementation of _dispatch_semaphore_wait_slow, when the value is less than 0, according to wait event timeout to do different operations
DISPATCH_NOINLINE static long _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long orig; _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); switch (timeout) { default: if (! _dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) { break; } // Fall through and try to undo what the fast path did to // dsema->dsema_value case DISPATCH_TIME_NOW: orig = dsema->dsema_value; while (orig < 0) { if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1, &orig, relaxed)) { return _DSEMA4_TIMEOUT(); } } // Another thread called semaphore_signal(). // Fall through and drain the wakeup. case DISPATCH_TIME_FOREVER: _dispatch_sema4_wait(&dsema->dsema_sema); break; } return 0; }Copy the code
2.3 dispatch_semaphore_signal unlock
The source implementation of the function is as follows, its core is also through the os_atomic_inc2o function on the value of ++ operation, os_atomic_inc2o internal is through C++ atomic_fetch_add_explicit
- If the value is greater than 0, the operation is invalid and the operation succeeds
- If value equals 0, a long wait is entered
Long dispatch_semaphore_signal(dispatch_semaphore_t dsema) {// long value = os_atomic_inc2O (dsema, dsema_value, release); If (likely(value > 0)) {// return 0; } if (unlikely(value == LONG_MIN)) { DISPATCH_CLIENT_CRASH(value, "Unbalanced call to dispatch_semaphore_signal()"); } return _dispatch_semaphore_signal_slow(dsema); // Enter a long wait}Copy the code
The macro definition for OS_ATOMic_DEC2O is transformed as follows
#define os_atomic_inc2o(p, f, m) \
os_atomic_add2o(p, f, 1, m)
#define os_atomic_add2o(p, f, v, m) \
os_atomic_add(&(p)->f, (v), m)
#define os_atomic_add(p, v, m) \
_os_atomic_c11_op((p), (v), m, add, +)
#define _os_atomic_c11_op(p, v, m, o, op) \
({ _os_atomic_basetypeof(p) _v = (v), _r = \
atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), _v, \
memory_order_##m); (__typeof__(_r))(_r op _v); })
Copy the code
So I’m going to substitute the actual value for
os_atomic_inc2o(dsema, dsema_value, release); os_atomic_add2o(dsema, dsema_value, 1, m) os_atomic_add(&(dsema)->dsema_value, (1), m) _os_atomic_c11_op((dsema->dsema_value), (1), m, add, +) _r = atomic_fetch_add_explicit(dsema->dsema_value, 1), Equivalent to dsema->dsema_value + 1Copy the code
2.4 summarize
- Dispatch_semaphore_create is basically the initialization limit
- Dispatch_semaphore_wait performs a lock operation on the value of the semaphore
- Dispatch_semaphore_signal is a ++ operation that unlocks the value of the semaphore
Therefore, to sum up, the underlying operations of the semaphore correlation function are shown in the figure
3 Exploration of underlying principles of GCD scheduling group
The most direct function of a scheduling group is to control the execution order of tasks
Dispatch_group_create Create a group dispatch_group_async Group task dispatch_group_notify Notify that a group task is completed dispatch_group_Wait Wait time for a group task to be executed // The incoming and outgoing groups are usually used in pairs. Dispatch_group_enter into the group and dispatch_group_leave out of the groupCopy the code
Suppose you have two tasks and need to wait for both tasks to complete before you update the UI, you can use the scheduling group
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_enter(group);
dispatch_async(queue, ^{
sleep(1);
NSLog(@"task 1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"task 2");
dispatch_group_leave(group);
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"dispatch_group_notify task 3");
});
NSLog(@"main thread task 4");
Copy the code
2021-08-04 17:14:46.485864+0800 001-- Function and queue [35918:2084037] Task 2 2021-08-04 17:14:46.485864+0800 [35918:2083991] main thread task 4 2021-08-04 17:14:47.490749+0800 001-- Function and queue [35918:2084039] task 1 2021-08-04 17:14:47.491021+0800 001-- Function and queue [35918:2083991] dispatch_group_notify Task 3Copy the code
- [Modification 1] If “dispatch_group_notify” is moved to the first place, can it be executed?
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"dispatch_group_notify task 3");
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
sleep(1);
NSLog(@"task 1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"task 2");
dispatch_group_leave(group);
});
NSLog(@"main thread task 4");
Copy the code
2021-08-04 17:17:27.918027+0800 001-- Function and queue [35951:2086835] Main thread task 4 2021-08-04 17:17:27.918031+0800 001-- Function and queue [35951:2086993] task 2 2021-08-04 17:17:27.927527+0800 001-- Function and queue [35951:2086835] dispatch_group_notify task 3 2021-08-04 17:17:28.921646+0800 001-- Function and queue [35951:2086996] Task 1Copy the code
Yes, but as long as there is an enter-leave pairing, notify will be executed without waiting for both groups to complete. Enter -leave: enter-leave: enter-leave: enter-leave
- Enter: wait is 3:2. Can I execute notify?
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_enter(group);
dispatch_async(queue, ^{
sleep(1);
NSLog(@"task 1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"task 2");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"dispatch_group_notify task 3");
});
NSLog(@"main thread task 4");
Copy the code
2021-08-04 17:19:54.681371+0800 001-- Function and queue [35985:2090360] Main thread task 4 2021-08-04 17:19:54.681373+0800 001-- Function and queue [35985:2090502] task 2 2021-08-04 17:19:55.684775+0800 001-- Function and queue [35985:2090500] Task 1Copy the code
No, it will wait for a leave before the notify is executed
- [Modify 3] If enter: wait is 2:3, can I execute notify?
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_enter(group);
dispatch_async(queue, ^{
sleep(1);
NSLog(@"task 1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"task 2");
dispatch_group_leave(group);
});
dispatch_group_leave(group);
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"dispatch_group_notify task 3");
});
NSLog(@"main thread task 4");
Copy the code
It’s going to crash because the enter-leave doesn’t work, it’s going to crash because the async has a delay
3.1 dispatch_group_create Create groups
Basically, you create the group and set the property, and the value of the group is 0
- Enter the source code of dispatch_group_create
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
Copy the code
- Enter the _dispatch_group_CREATE_WITH_count source code, which is to assign a value to the group object attribute, and return the group object, where n is equal to 0
DISPATCH_ALWAYS_INLINE static inline dispatch_group_t _dispatch_group_create_with_count(uint32_t n) { // Create a group object. The type of the object is OS_dispatch_group dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s)); Dg ->do_next = DISPATCH_OBJECT_LISTLESS; dg->do_targetq = _dispatch_get_default_queue(false); if (n) { os_atomic_store2o(dg, dg_bits, (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed); os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411> } return dg; }Copy the code
3.2 dispatch_group_enter into groups
Enter dispatch_group_Enter source code, os_atomic_sub_orig2O to dg->dg. Bits to make — operation, the value of processing
void dispatch_group_enter(dispatch_group_t dg) { // The value is decremented on a 32bits wide atomic so that the carry // for the 0 -> -1 transition is not propagated to the upper 32bits. uint32_t old_bits = os_atomic_sub_orig2o(dg, Dg_bits, 0 -> -1 DISPATCH_GROUP_VALUE_INTERVAL, acquire); uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK; If (unlikely(old_value == 0)) {// if old_value _dispatch_retain(dg); // <rdar://problem/22318411>} if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) { DISPATCH_CLIENT_CRASH(old_bits, "Too many nested calls to dispatch_group_enter()"); }}Copy the code
3.3 dispatch_group_leave group
Enter the dispatch_group_leave source code
- Minus 1 to 0, that’s the ++ operation
- Depending on the state, the do-while loop wakes up the block task
- If 0 + 1 = 1, enter-leave is unbalanced, that is, leave is called multiple times, crash will occur
void dispatch_group_leave(dispatch_group_t dg) { // The value is incremented on a 64bits wide atomic so that the carry for // the -1 -> 0 transition increments the generation atomically. uint64_t new_state, Old_state = os_atomic_add_orig2O (dg, dg_state, atom increment ++ DISPATCH_GROUP_VALUE_INTERVAL, release); uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK); If (old_value == DISPATCH_GROUP_VALUE_1)) {old_state += DISPATCH_GROUP_VALUE_INTERVAL; do { new_state = old_state; if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) { new_state &= ~DISPATCH_GROUP_HAS_WAITERS; new_state &= ~DISPATCH_GROUP_HAS_NOTIFS; } else { // If the group was entered again since the atomic_add above, // we can't clear the waiters bit anymore as we don't know for // which generation the waiters are for new_state &= ~DISPATCH_GROUP_HAS_NOTIFS; } if (old_state == new_state) break; } while (unlikely(! os_atomic_cmpxchgv2o(dg, dg_state, old_state, new_state, &old_state, relaxed))); return _dispatch_group_wake(dg, old_state, true); } // leave = 0, 0+1 -> 1; If (unlikely(old_value == 0)) {DISPATCH_CLIENT_CRASH((uintptr_t)old_value, "Unbalanced call to dispatch_group_leave()"); }}Copy the code
- Enter the _dispatch_group_wake source code, do-while loop asynchronous hit, call _dispatch_continuation_async execution
DISPATCH_NOINLINE static void _dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release) { uint16_t refs = needs_release ? 1:0; // <rdar://problem/22318411> if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) { dispatch_continuation_t dc, next_dc, tail; // Snapshot before anything is notified/woken <rdar://problem/8554546> dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail); do { dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data; next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next); _dispatch_continuation_async(dsn_queue, dc, _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags); // Block task execution _dispatch_release(dsn_queue); } while ((dc = next_dc)); //do-while loop, async task hits refs++; } if (dg_state & DISPATCH_GROUP_HAS_WAITERS) { _dispatch_wake_by_address(&dg->dg_gen); } if (refs) _dispatch_release_n(dg, refs); // Release the reference}Copy the code
- Go to the source code of _dispatch_continuation_async
DISPATCH_ALWAYS_INLINE static inline void _dispatch_continuation_async(dispatch_queue_class_t dqu, dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags) { #if DISPATCH_INTROSPECTION if (! (dc_flags & DC_FLAG_NO_INTROSPECTION)) { _dispatch_trace_item_push(dqu, dc); // trace log} #else (void)dc_flags; #endif return dx_push(dqu._dq, dc, qos); Dx_invoke ();}Copy the code
This step is consistent with the block callback execution of an asynchronous function and will not be explained here
3.4 dispatch_group_notify notice
Enter the source code of dispatch_group_notify, if old_state is equal to 0, you can release
DISPATCH_ALWAYS_INLINE static inline void _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, dispatch_continuation_t dsn) { uint64_t old_state, new_state; dispatch_continuation_t prev; dsn->dc_data = dq; _dispatch_retain(dq); // Run the os_atomic_store2O command to obtain the underlying state identifier of dg. State prev = OS_MPSC_PUSH_update_tail (OS_MPSC (dg, dg_notify), DSN, do_next); if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg); os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next); if (os_mpsc_push_was_empty(prev)) { os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, { new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS; If ((uint32_t)old_state == 0) { Os_atomic_rmw_loop_give_up ({return _dispatch_group_wake(dg, new_state, false); / / wake up}); }}); }}Copy the code
In addition to leave can be woken up with _dispatch_group_wake, dispatch_group_notify can also be woken up
- Os_mpsc_push_update_tail is a macro that is used to obtain the status code of dg
#define os_mpsc_push_update_tail(Q, tail, _o_next) ({ \ os_mpsc_node_type(Q) _tl = (tail); \ os_atomic_store2o(_tl, _o_next, NULL, relaxed); \ os_atomic_xchg(_os_mpsc_tail Q, _tl, release); The \})Copy the code
3.5 dispatch_group_async
Enter dispatch_group_async source code, mainly packaging tasks and asynchronous processing tasks
#ifdef __BLOCKS__ void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db) { dispatch_continuation_t dc = _dispatch_continuation_alloc(); uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC; dispatch_qos_t qos; Qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags); Continuation_group_async (dg, dq, dc, qos); } #endifCopy the code
Enter _dispatch_continuation_group_async source code, mainly to encapsulate dispatch_group_Enter into the group operation
DISPATCH_ALWAYS_INLINE static inline void _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_continuation_t dc, dispatch_qos_t qos) { dispatch_group_enter(dg); Dc ->dc_data = dg; _dispatch_continuation_async(dq, dc, qos, dc->dc_flags); // Asynchronous operation}Copy the code
Enter the _dispatch_continuation_async source code, perform the general asynchronous function low-level operations. Since there is enter, there must be leave, and we assume that after the block executes, the leave is implicitly executed, debugging through the breakpoint and printing the stack informationSearch for calls of _dispatch_client_callout, at _dispatch_continuation_with_group_invoke
DISPATCH_ALWAYS_INLINE static inline void _dispatch_continuation_with_group_invoke(dispatch_continuation_t dc) { struct dispatch_object_s *dou = dc->dc_data; unsigned long type = dx_type(dou); If (type == DISPATCH_GROUP_TYPE) {// If the dispatch group type is _dispatch_client_callout(dc-> dc_CTxt, dc->dc_func); / / block callback _dispatch_trace_item_complete (dc); dispatch_group_leave((dispatch_group_t)dou); } else {DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type"); }}Copy the code
Therefore, it is perfectly confirmed that the underlying encapsulation of dispatch_group_async is enter-leave
3.6 summarize
- Enter -leave as long as the pair, regardless of distance
- The “dispatch_group_Enter” function operates on the value of the group (i.e. 0 -> -1).
- “Dispatch_group_leave” is a C++ function that operates on the value of the group (-1 -> 0).
- The dispatch_group_notify function determines whether the state of the group is equal to 0 and notifies the group when it is equal to 0
- You can wake up a block task using dispatch_group_leave or dispatch_group_notify
- Dispatch_group_async is equivalent to enter-leave, and its underlying implementation is enter-leave
To sum up, the underlying analysis process of scheduling group is shown in the figure below
4 GCD dispatch_source
The paper
- Dispatch_source is the underlying data type used to coordinate the processing of specific underlying system events.
- The dispatch_source function replaces the asynchronous callback function to handle system-specific events. When configuring a dispatch, you need to specify the events to monitor, the Dispatch queue, and the code (block or function) to handle the events. When an event occurs, the Dispatch Source commits your block or function to the specified queue for execution.
- The general process is as follows: After calling a function of dispatch_source_merge_data on any thread, the handle (which can be simply understood as a block) defined by the Dispatch Source will be executed. This process is called Custom Event, user event. Is the type of event that the Dispatch Source supports processing.
The characteristics of
-
The CPU load is very small and consumes almost no resources
-
Any thread that calls its function dispatch_source_merge_data executes the handle (which can be simply interpreted as a block) defined by the DispatchSource. This process is called custom Event, user event. Is the type of event that dispatch_source supports processing.
A handle is a pointer to a pointer. It refers to a class or structure that is closely related to the system. There is also a common HANDLE, which is called HANDLE. There are four kinds of handles
, 1. Instance handle HINSTANCE, 2. Bitmap handle HBITMAP, 3. Device table handle HDC, 4. Icon handle HICON
4.1 dispatch_source_create
- Parameters that
/* type dispatch Specifies the event that the source can handle; handle, index, or id; mask specifies the process id; mask specifies the process ID; mask specifies the process id; */ dispatch_source_t source = dispatch_source_create(dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue)Copy the code
- Dispatch the Source species
There are several types of type
DISPATCH_SOURCE_TYPE_DATA_ADD User-defined events, variable added DISPATCH_SOURCE_TYPE_DATA_OR user-defined events, Variable OR DISPATCH_SOURCE_TYPE_MACH_SEND The MACH port sends DISPATCH_SOURCE_TYPE_MACH_RECV The MACH port receives DISPATCH_SOURCE_TYPE_MEMORYPRESSURE Memory pressure (Note: (available after iOS8) DISPATCH_SOURCE_TYPE_PROC Process listens for example, process exits, creates one or more subthreads, process receives UNIX signal For example, read response to file operations and socket operations DISPATCH_SOURCE_TYPE_SIGNAL Response when UNIX signals are received DISPATCH_SOURCE_TYPE_TIMER Timer DISPATCH_SOURCE_TYPE_VNODE File status monitoring. Files are deleted, moved, or renamed. DISPATCH_SOURCE_TYPE_WRITE IO operations, such as write response to file operations and socket operationsCopy the code
Note:
-
DISPATCH_SOURCE_TYPE_DATA_ADD If an event is triggered at a high frequency at the same time, the Dispatch Source will accumulate these responses in the form of ADD and finally process them when the system is idle. If the triggering frequency is scattered, The Dispatch Source then responds to these events separately.
-
DISPATCH_SOURCE_TYPE_DATA_OR is a custom event as above, but it is accumulated as an OR
-
Commonly used method
Dispatch_suspend (queue) dispatch_suspend(queue) dispatch_suspend(queue) dispatch_suspend(queue) dispatch_suspend(queue) dispatch_suspend(queue) You cannot pass a value of 0 (the event will not be fired), nor can you pass a negative number. Dispatch_source_merge_data // Set the block that responds to dispatch source events, Run dispatch_source_set_event_handler on the queue specified by the dispatch source to get dispatch_source_get_data The second parameter of dispatch_source_create is called uintptr_t dispatch_source_get_handle(dispatch_source_t source); Dispatch_source_create unsigned long dispatch_source_get_mask(dispatch_source_t source); //// Cancel event handling of the dispatch source -- that is, block is no longer called. If you call dispatch_SUSPEND you just suspend the dispatch source. void dispatch_source_cancel(dispatch_source_t source); Long dispatch_source_testCancel (dispatch_source_t source); // Check whether the dispatch_source_t source is canceled. // Dispatch the block that is called when the source is cancelled. Release related resources void dispatch_source_set_cancel_handler(dispatch_source_t source, dispatch_block_t cancel_handler); // Can be used to set the block to be called when the dispatch source is started and to be released when the call is complete. This function can also be called at any time while the Dispatch source is running. void dispatch_source_set_registration_handler(dispatch_source_t source, dispatch_block_t registration_handler);Copy the code
(Source type is DISPATCH_SOURCE_TYPE_TIMER)
__block int timeout = 3; Dispatch_queue_t globalQueue = dispatch_get_global_queue(0, 0); // Create timer dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, globalQueue); // Set the error when the timer is triggered once for 1s and the error when the timer is triggered for 0s /* -source dispatches the source-start number to control the time when the timer is triggered for the first time. The parameter type is dispatch_time_t, which is of opaque type and we cannot directly operate on it. We need the dispatch_time and dispatch_walltime functions to create them. In addition, constants DISPATCH_TIME_NOW and DISPATCH_TIME_FOREVER are often useful. - interval Interval - Accuracy of leeway timer triggering */ dispatch_sourCE_set_timer (timer,dispatch_walltime(NULL, 0),1.0*NSEC_PER_SEC, 0); Dispatch_source_set_event_handler (timer, ^{// Countdown is over, Disable if (timeout <= 0) {// Cancel dispatch source dispatch_source_cancel(timer); }else{ timeout--; NSLog(@" countdown - %d", timeout); }); }}); // Start the execution of dispatch source dispatch_resume(timer);Copy the code
Q: How do Dispatch_source_t timers compare with NSTimer and CADisplayLink?
- NSTimer
- There is a delay related to RunLoop and RunLoop Mode
- (if the Runloop is performing a continuous operation, the timer will be delay trigger) need to manually add Runloop, and the Model need to be set to forMode: NSCommonRunLoopMode (NSDefaultRunLoopMode mode, Touching events pauses the timer.)
- CADisplayLink
- Call CADisplayLink when the screen is refreshed, a timer class that draws specific content on the screen at a frequency synchronized with the screen refresh frequency.
- CADisplayLink is normally called at the end of each refresh, which is fairly accurate.
- If the invoked method is time-consuming, beyond the screen refresh cycle, several callback invocation opportunities will be skipped.
- Dispatch_source_t timer
- Time is accurate, can use subthreads, solve the problem of running on the main thread card UI
- It does not rely on Runloop and is processed based on the system kernel with high accuracy
4.1 dispatch_source_create source
dispatch_source_t dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle, unsigned long mask, dispatch_queue_t dq) { dispatch_source_refs_t dr; dispatch_source_t ds; dr = dux_create(dst, handle, mask)._dr; if (unlikely(! dr)) { return DISPATCH_BAD_INPUT; } ds = _dispatch_queue_alloc(source, dux_type(dr)->dst_strict ? DSF_STRICT : DQF_MUTABLE, 1, DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER)._ds; ds->dq_label = "source"; ds->ds_refs = dr; dr->du_owner_wref = _dispatch_ptr2wref(ds); if (unlikely(! dq)) { dq = _dispatch_get_default_queue(true); } else { _dispatch_retain((dispatch_queue_t _Nonnull)dq); } ds->do_targetq = dq; if (dr->du_is_timer && (dr->du_timer_flags & DISPATCH_TIMER_INTERVAL)) { dispatch_source_set_timer(ds, DISPATCH_TIME_NOW, handle, UINT64_MAX); } _dispatch_object_debug(ds, "%s", __func__); return ds; }Copy the code
4.2 dispatch_source_set_timer source
DISPATCH_NOINLINE void dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start, uint64_t interval, uint64_t leeway) { dispatch_timer_source_refs_t dt = ds->ds_timer_refs; dispatch_timer_config_t dtc; if (unlikely(! dt->du_is_timer)) { DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source"); } if (dt->du_timer_flags & DISPATCH_TIMER_INTERVAL) { dtc = _dispatch_interval_config_create(start, interval, leeway, dt); } else { dtc = _dispatch_timer_config_create(start, interval, leeway, dt); } if (_dispatch_timer_flags_to_clock(dt->du_timer_flags) ! = dtc->dtc_clock && dt->du_filter == DISPATCH_EVFILT_TIMER_WITH_CLOCK) { DISPATCH_CLIENT_CRASH(0, "Attempting to modify timer clock"); } _dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer); dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release); if (dtc) free(dtc); dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY); }Copy the code
4.3 dispatch_source_set_event_handler source
#ifdef __BLOCKS__
void
dispatch_source_set_event_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
_dispatch_source_set_handler(ds, handler, DS_EVENT_HANDLER, true);
}
Copy the code
DISPATCH_NOINLINE
static void
_dispatch_source_set_handler(dispatch_source_t ds, void *func,
uintptr_t kind, bool is_block)
{
dispatch_continuation_t dc;
dc = _dispatch_source_handler_alloc(ds, func, kind, is_block);
if (_dispatch_lane_try_inactive_suspend(ds)) {
_dispatch_source_handler_replace(ds, kind, dc);
return _dispatch_lane_resume(ds, DISPATCH_RESUME);
}
dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds);
if (unlikely(dqf & DSF_STRICT)) {
DISPATCH_CLIENT_CRASH(kind, "Cannot change a handler of this source "
"after it has been activated");
}
// Ignore handlers mutations past cancelation, it's harmless
if ((dqf & DSF_CANCELED) == 0) {
_dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation, ds);
if (kind == DS_REGISTN_HANDLER) {
_dispatch_bug_deprecated("Setting registration handler after "
"the source has been activated");
} else if (func == NULL) {
_dispatch_bug_deprecated("Clearing handler after "
"the source has been activated");
}
}
dc->dc_data = (void *)kind;
_dispatch_barrier_trysync_or_async_f(ds, dc,
_dispatch_source_set_handler_slow, 0);
}
Copy the code
4.4 dispatch_resume source
void dispatch_resume(dispatch_object_t dou) { DISPATCH_OBJECT_TFB(_dispatch_objc_resume, dou); if (unlikely(_dispatch_object_is_global(dou) || _dispatch_object_is_root_or_base_queue(dou))) { return; } if (dx_cluster(dou._do) == _DISPATCH_QUEUE_CLUSTER) { _dispatch_lane_resume(dou._dl, DISPATCH_RESUME); }}Copy the code
4.5 dispatch_source_cancel source
void dispatch_source_cancel(dispatch_source_t ds) { _dispatch_object_debug(ds, "%s", __func__); // Right after we set the cancel flag, someone else // could potentially invoke the source, do the cancellation, // unregister the source, and deallocate it. We would // need to therefore retain/release before setting the bit _dispatch_retain_2(ds); if (_dispatch_queue_atomic_flags_set_orig(ds, DSF_CANCELED) & DSF_CANCELED){ _dispatch_release_2_tailcall(ds); } else { dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2); }}Copy the code