Barrier function

The most direct function of fence function is to control the execution sequence and synchronization of tasks.

  • Dispatch_barrier_async is not sent until the preceding tasks are completed
  • Dispatch_barrier_sync does the same, but blocks the thread and affects subsequent tasks
  • One very important point: the fence function can only control the same concurrent queue

Application of fence function

During development we should all have a situation where multiple requests are executed concurrently, and when all the requests are completed, we’re all working on one thing. Let’s look at some examples of fence functions.

  • Case 1

  • Case 2

  • Case 3

  • Case 4
*/ - (void)demo3{// Threadsafe? dispatch_queue_t concurrentQueue = dispatch_queue_create("chenxi", DISPATCH_QUEUE_CONCURRENT); Marray for (int I = 0; i<1000; i++) { dispatch_async(concurrentQueue, ^{ NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)]; NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil]; NSData *data = [NSData dataWithContentsOfURL:url]; UIImage *image = [UIImage imageWithData:data]; dispatch_barrier_async(concurrentQueue , ^{ [self.mArray addObject:image]; }); }); }}Copy the code

As can be seen from the above case, dispatch_barrier_async in case 1 only blocks tasks in the same queue and does not affect the execution of task 5. But case 2 using dispatch_barrier_sync blocks all subsequent tasks, including task 5. Case 3 did not block when we changed to a global concurrent queue. In case 4, it’s not safe for multiple threads to operate on Marray, because when you add elements to Marray, you’re essentially releasing the old value, retaining the new value, and in the case of multiple threads, two threads are releasing marray at the same time, When another thread accesses marray at this time, bad address access will occur, so an error will occur. The fence function solves this problem. Finally, we explore the underlying implementation principle of the fence function with two questions.

  • Why can the barrier function play a barrier role
  • Why does the global queue fence function not work

The underlying principle of fence function

Here we look at the underlying logic of the fence function. First we search for dispatch_barrier_async in the libdispatch.dylib source code. Then we follow the dispatch_barrier_SYNC -> _dispatch_barrier_sync_f -> _dispatch_barrier_sync_F_inline, Finally, we come to the _dispatch_barrier_sync_F_inline function.

Here we can see that the code is very similar to the synchronization function we saw earlier, but it may also execute the _dispatch_sync_F_slow function and also deadlock. There’s just a DC_FLAG_BARRIER. Next, let’s look at the implementation of _dispatch_SYNc_RECURse.

Before we execute the fence function, we have to clear the queue of tasks to make sure there are no other tasks in the queue. When this is done, the _dispatch_sync_INVOke_and_COMPLEte_RECURse function is executed. _dispatch_sync_invoke_and_complete_recurse – > _dispatch_sync_complete_recurse, We follow the flow to the _dispatch_sync_complete_RECURse function.

The _dispatch_lane_non_barrier_complete function tells the system it’s done and fixes the status. So here we look at dx_wakeup.

Here we see the custom serial queue, the dQ_wakeup assignment for the concurrent queue is a little different from the global concurrent queue, so let’s look at _dispatch_root_queue_wakeup first.

Next we go to the _dispatch_lane_barrier_complete function.

_dispatch_lane_class_barrier_complete (dispatch_lane_class_barrier_complete)

So far we know how the barrier function can block tasks, but we still have a problem, which is why the global concurrent queue barrier function is invalid. Let’s look at it here.

When we search for _dispatch_root_queue_wakeup we can see that barrier is not handled, which is why the barrier function is invalid. The reason for apple’s design is that global concurrent queues are shared globally and may also be used by system tasks, which will be affected if the fence function is in effect.

A semaphore

  • Dispatch_semaphore_create Creates a semaphore
  • Dispatch_semaphore_wait Semaphore wait
  • Dispatch_semaphore_signal Indicates signal release

Synchronize -> When lock, control the maximum number of concurrent GCD

Use of semaphores

  • Case 1
dispatch_queue_t queue = dispatch_get_global_queue(0, 0); dispatch_semaphore_t sem = dispatch_semaphore_create(2); // dispatch_async(queue, ^{dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); sleep(2); NSLog(@" Perform task 1"); NSLog(@" Task 1 completed "); dispatch_semaphore_signal(sem); }); // dispatch_async(queue, ^{dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); sleep(2); NSLog(@" Perform task 2"); NSLog(@" Task 2 completed "); dispatch_semaphore_signal(sem); }); // dispatch_async(queue, ^{dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); sleep(2); NSLog(@" Perform task 3"); NSLog(@" Task 3 completed "); dispatch_semaphore_signal(sem); }); // dispatch_async(queue, ^{dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); sleep(2); NSLog(@" Perform task 4"); NSLog(@" Task 4 completed "); dispatch_semaphore_signal(sem); });Copy the code

From case 1 we can see that we set the number of concurrent threads to 2 by semaphore, and from print we can see that only task 1 and task 2 are executed at the same time, task 3 and task 4 are executed two seconds later. In our normal development process, data can be uploaded in this way.

  • Case 2

In case 2, we can see that dispatch_semaphore_wait and dispatch_semaphore_signal can be used together to achieve synchronization effect. Task 1 can only be executed when task 2 is completed.

Semaphore underlying principles

Now that we know what semaphores do, let’s look at the underlying principles of semaphores.

  • dispatch_semaphore_create

As you can see from the comments, this will only work if value is greater than or equal to 0, and it will work if value is less than 0.

  • dispatch_semaphore_wait
intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
	long value = os_atomic_dec2o(dsema, dsema_value, acquire);
	if (likely(value >= 0)) {
		return 0;
	}
	return _dispatch_semaphore_wait_slow(dsema, timeout);
}
Copy the code

The os_atomic_dec2O function is used to decrease dsemA_value by 1. Likely (value >= 0) So the _dispatch_semaphore_wait_slow function is executed.

static intptr_t _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long orig; _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); switch (timeout) { default: if (! _dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) { break; } // Fall through and try to undo what the fast path did to // dsema->dsema_value case DISPATCH_TIME_NOW: orig = dsema->dsema_value; while (orig < 0) { if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1, &orig, relaxed)) { return _DSEMA4_TIMEOUT(); } } // Another thread called semaphore_signal(). // Fall through and drain the wakeup. case DISPATCH_TIME_FOREVER: _dispatch_sema4_wait(&dsema->dsema_sema); break; } return 0; }Copy the code

If we send dispatch_semaphore_wait with DISPATCH_TIME_NOW, If it is DISPATCH_TIME_FOREVER then _dispatch_sema4_wait is executed. Our usual parameter is DISPATCH_TIME_FOREVER.

void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
	int ret = 0;
	do {
		ret = sem_wait(sema);
	} while (ret == -1 && errno == EINTR);
	DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}
Copy the code

Sem_wait is a low-level encapsulation of pthread, so we don’t need to worry too much about sem_wait. Dispatch_semaphore_wait (sem, DISPATCH_TIME_FOREVER) can be seen as do {} while loop.

  • dispatch_semaphore_signal
intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
	long value = os_atomic_inc2o(dsema, dsema_value, release);
	if (likely(value > 0)) {
		return 0;
	}
	if (unlikely(value == LONG_MIN)) {
		DISPATCH_CLIENT_CRASH(value,
				"Unbalanced call to dispatch_semaphore_signal()");
	}
	return _dispatch_semaphore_signal_slow(dsema);
}
Copy the code

Os_atomic_inc2o = dsemA_value = 1; likely(value > 0) = likely(value > 0) There are too many dispatch_semaphore_WAIT operations and there is no one-to-one correspondence with dispatch_semaphore_signal. _dispatch_semaphoRE_signal_slow handles exceptions and increments dSEMA_value by 1 until it is greater than 0.

Scheduling group

The most direct function of a scheduling group is to control the execution sequence of tasks

  • Dispatch_group_create create a group
  • Dispatch_group_async Group task
  • Dispatch_group_notify Notifies of group task completion
  • Dispatch_group_wait Time for executing group tasks
  • Dispatch_group_enter into groups
  • Dispatch_group_leave Exit group

Scheduling group applications

  • case

For example, in the above case, we request multiple images concurrently and add the asynchronous task to the scheduling group. After all the tasks in the scheduling group are completed, dispatch_group_notify will be called. We combine multiple images here.

Underlying principles of scheduling groups

Here we need to explore the underlying principle of scheduling groups with a few questions.

  • Enter group and go out group why should tie-in use, want advanced hind go out

  • How does the scheduling group control the flow

  • As shown in the case, why dispatch_group_async = dispatch_group_enter + dispatch_group_leave

  • dispatch_group_create

dispatch_group_create(void)
{
	return _dispatch_group_create_with_count(0);
}
Copy the code
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
	dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
			sizeof(struct dispatch_group_s));
	dg->do_next = DISPATCH_OBJECT_LISTLESS;
	dg->do_targetq = _dispatch_get_default_queue(false);
	if (n) {
		os_atomic_store2o(dg, dg_bits,
				(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
		os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
	}
	return dg;
}
Copy the code

Os_atomic_store2o specifies the name of the group to be used to create a semaphore.

  • dispatch_group_enter
void dispatch_group_enter(dispatch_group_t dg) { // The value is decremented on a 32bits wide atomic so that the carry // for the 0 -> -1 transition is not propagated to the upper 32bits. uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits, DISPATCH_GROUP_VALUE_INTERVAL, acquire); uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK; if (unlikely(old_value == 0)) { _dispatch_retain(dg); // <rdar://problem/22318411> } if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) { DISPATCH_CLIENT_CRASH(old_bits, "Too many nested calls to dispatch_group_enter()"); }}Copy the code

Here you can see that, like the semaphore, the initial value is decrement to -1, but unlike the semaphore there is no _dispatch_sema4_wait function.

  • dispatch_group_leave
void
dispatch_group_leave(dispatch_group_t dg)
{
	// The value is incremented on a 64bits wide atomic so that the carry for
	// the -1 -> 0 transition increments the generation atomically.
	uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
			DISPATCH_GROUP_VALUE_INTERVAL, release);
	uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);

	if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
		old_state += DISPATCH_GROUP_VALUE_INTERVAL;
		do {
			new_state = old_state;
			if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
				new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			} else {
				// If the group was entered again since the atomic_add above,
				// we can't clear the waiters bit anymore as we don't know for
				// which generation the waiters are for
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			}
			if (old_state == new_state) break;
		} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
				old_state, new_state, &old_state, relaxed)));
		return _dispatch_group_wake(dg, old_state, true);
	}

	if (unlikely(old_value == 0)) {
		DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
				"Unbalanced call to dispatch_group_leave()");
	}
}
Copy the code

Old_state equals zero, and then goes to if (Unlikely (OLd_value == DISPATCH_GROUP_VALUE_1)), and then executes _dispatch_group_wake, The reason for executing _dispatch_group_wake is that asynchronous tasks are time-consuming. Dispatch_group_leave (group) is executed before dispatch_group_notify. This is to eliminate the effect of multithreading.

  • dispatch_group_notify
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, dispatch_continuation_t dsn) { uint64_t old_state, new_state; dispatch_continuation_t prev; dsn->dc_data = dq; _dispatch_retain(dq); prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next); if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg); os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next); if (os_mpsc_push_was_empty(prev)) { os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, { new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS; if ((uint32_t)old_state == 0) { os_atomic_rmw_loop_give_up({ return _dispatch_group_wake(dg, new_state, false); }); }}); }}Copy the code

Here, layer upon layer judgment will be made. When OLD_state == 0, _dispatch_group_WAKE function will be executed, which is the weak_UP process (block callout process) of synchronous function or asynchronous function.

  • dispatch_group_async
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_block_t db)
{
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
	_dispatch_continuation_group_async(dg, dq, dc, qos);
}
Copy the code
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dc, dispatch_qos_t qos)
{
	dispatch_group_enter(dg);
	dc->dc_data = dg;
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
Copy the code

Where is the dispatch_group_leave function executed? Let’s continue with the process.

_dispatch_continuation_async(dispatch_queue_class_t dqu, dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags) { #if DISPATCH_INTROSPECTION if (! (dc_flags & DC_FLAG_NO_INTROSPECTION)) { _dispatch_trace_item_push(dqu, dc); } #else (void)dc_flags; #endif return dx_push(dqu._dq, dc, qos); }Copy the code
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
Copy the code
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
	.do_type        = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
	.do_dispose     = _dispatch_object_no_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_object_no_invoke,

	.dq_activate    = _dispatch_queue_no_activate,
	.dq_wakeup      = _dispatch_root_queue_wakeup,
	.dq_push        = _dispatch_root_queue_push,
);
Copy the code

_dispatch_root_queue_push -> _dispatch_root_queue_push_inline -> _dispatch_root_queue_poke -> _dispatch_root_queue_poke_slow -> _dispatch_root_queues_init -> _dispatch_root_queues_init_once -> _dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> _dispatch_continuation_invoke_inline – > _dispatch_continuation_with_group_invoke. We follow the flow to the implementation of the _dispatch_continuation_with_group_invoke function.

_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc) { struct dispatch_object_s *dou = dc->dc_data; unsigned long type = dx_type(dou); if (type == DISPATCH_GROUP_TYPE) { _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); _dispatch_trace_item_complete(dc); dispatch_group_leave((dispatch_group_t)dou); } else { DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type"); }}Copy the code

Here you can see a call to dispatch_group_leave((dispatch_group_t)dou) This is why dispatch_group_async = dispatch_group_enter + dispatch_group_leave.

Dispatch_Source

* Its CPU load is very small and tries not to take up resources * the advantage of joins

Call dispatch_source_merge_data on any thread, and execute the Dispatch Source’s pre-defined handle (which can be simply interpreted as a block) called Custom Event. Is the type of event that a Dispatch source supports.

A handle is a pointer that points to a pointer that points to a class or structure that has a very close relationship to the system HINSTANCE, HBITMAP, HDC, HICON, etc. There is also a generic HANDLE called HANDLE.

  • dispatch_sourceCommon functions:
  • Dispatch_source_create create the source
  • Dispatch_source_set_event_handler Sets the source event callback
  • Dispatch_source_merge_data Source event setting data

Dispatch_source_get_data Obtains source event data

  • Dispatch_resume continue

  • Dispatch_suspend hang

  • case

- (void)viewDidLoad { [super viewDidLoad]; self.totalComplete = 0; self.queue = dispatch_queue_create("chenxi.com", NULL); Self. source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, dispatch_get_main_queue()); Self. source, ^{NSLog(@"%@",[NSThread currentThread]); NSUInteger value = dispatch_source_get_data(self.source); self.totalComplete += value; NSLog (@ "progress: % 2 f, the self. The totalComplete / 100.0); Self. ProgressView. Progress = self. TotalComplete / 100.0; }); self.isRunning = YES; dispatch_resume(self.source); } / / here by clicking on the event controls totalComplete add operation - IBAction didClickStartOrPauseAction: (id) sender {the if (self. Set) { dispatch_suspend(self.source); dispatch_suspend(self.queue); NSLog(@" paused "); self.isRunning = NO; [sender setTitle: @ "pause in.." forState: UIControlStateNormal]; }else{ dispatch_resume(self.source); dispatch_resume(self.queue); NSLog(@" executed "); self.isRunning = YES; [sender setTitle: @ "pause in.." forState: UIControlStateNormal]; }} /** Perform dispatch_source_get_data(self.source) 100 times. */ - (void)touchesBegan:(NSSet< uittouch *> *)touches withEvent:(UIEvent *)event{ NSLog(at sign "started "); for (int i= 0; i<100; i++) { dispatch_async(self.queue, ^{ if (! Self. isRunning) {NSLog(@" paused "); return; } sleep(1); dispatch_source_merge_data(self.source, 1); }); }}Copy the code

Here we can see a use case for dispatch_source. The advantage of dispatch_source is that it is based on pThread encapsulation and is not affected by runloop.

Add: Mutable array is not secure cause analysis

// Is mutable array thread safe? dispatch_queue_t concurrentQueue = dispatch_queue_create("chenxi", DISPATCH_QUEUE_CONCURRENT); Marray for (int I = 0; i<1000; i++) { dispatch_async(concurrentQueue, ^{ NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)]; NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil]; NSData *data = [NSData dataWithContentsOfURL:url]; UIImage *image = [UIImage imageWithData:data]; [self.mArray addObject:image]; }); }Copy the code

For example, if the current mArray data is @[@(1)], when we have multiple threads at the same time to insert the second position of the current array, there will be operations on the same memory space at the same time, which will cause errors.