iOS底层学习 - 多线程之GCD底层原理篇

7,626 阅读14分钟

经过前几章的学习,我们对GCD的使用和队列的原理有了基本的了解,但是GCD底层到底是如何开辟线程,如何执行函数等还不是很清楚,本章就来一探究竟。

系列文章传送门:

iOS底层学习 - 多线程之基础原理篇

iOS底层学习 - 多线程之GCD初探

iOS底层学习 - 多线程之GCD队列原理篇

iOS底层学习 - 多线程之GCD应用篇

对于GCD的底层来说,主要有队列创建,函数执行,同步异步原理和其他应用函数的原理。关于队列原理的,我们之前的篇章已经讲过,相信对于GCD是如何创建队列的,已经有了认识,今天就来继续看其他的底层原理,还是通过源码来深入研究

同步执行dispatch_sync

死锁的原因

我们都知道,当使用dispatch_sync在串行队列上执行时,会形成dispatch_sync块任务和内部执行任务的相互等待,从而造成死锁崩溃。那么我就从这个问题来触发,看一下为什么会造成死锁,从而了解同步执行的原理

还是老规矩,talk is cheap,show me the code

我们通过源码找到了dispatch_sync的调用如下,由于unlikely一般运行的较少,多为容错处理,所以我们先跟主流程,最终来到了函数_dispatch_sync_f_inline

void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
	uintptr_t dc_flags = DC_FLAG_BLOCK;
	if (unlikely(_dispatch_block_has_private_data(work))) {
		return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
	}
	_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}

-----------------------------------------------------------------------------------------

static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
		uintptr_t dc_flags)
{
	_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}

-----------------------------------------------------------------------------------------

_dispatch_sync_f_inline中发现了一个判断likely(dq->dq_width == 1,通过之前队列的原理我们可以知道,串行队列的width是为1的,所以串行的执行方法,是在_dispatch_barrier_sync_f中的。

而且根据函数名,我们可以知道_dispatch_barrier是之前讲的栅栏函数的调用,所以说栅栏函数也会走到此方法中。

由于我们先找死锁的原因,所以在这里就先不看下面并发的逻辑了。

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
	
	✅// 串行 来到这里
	if (likely(dq->dq_width == 1)) {
		return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
	}

	if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
		DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
	}

	dispatch_lane_t dl = upcast(dq)._dl;
	// Global concurrent queues and queues bound to non-dispatch threads
	// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
	if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
		return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
	}

	if (unlikely(dq->do_targetq->do_targetq)) {
		return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
	}
	_dispatch_introspection_sync_begin(dl);
	_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
			_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

-----------------------------------------------------------------------------------------
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
	_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}

最终,我们来到了_dispatch_barrier_sync_f_inline函数中。

首先执行了_dispatch_tid_self方法。通过源码跟踪,我们可以发现其为宏定义的方法,底层主要执行了_dispatch_thread_getspecific。这个函数书主要是通过KeyValue的方式来获取线程的一些信息。在这里就是获取当前线程的tid,即唯一ID。

我们知道,造成死锁的原因就是串行队列上任务的相互等待。那么必然会通过tid来判断是否满足条件,从而找到了_dispatch_queue_try_acquire_barrier_sync函数

#define _dispatch_tid_self()		((dispatch_tid)_dispatch_thread_port())

#define _dispatch_thread_port() ((mach_port_t)(uintptr_t)\
		_dispatch_thread_getspecific(_PTHREAD_TSD_SLOT_MACH_THREAD_SELF))

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
	✅// 获取线程ID -- mach pthread --
	dispatch_tid tid = _dispatch_tid_self();

	if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
		DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
	}

	dispatch_lane_t dl = upcast(dq)._dl;
	// The more correct thing to do would be to merge the qos of the thread
	// that just acquired the barrier lock into the queue state.
	//
	// However this is too expensive for the fast path, so skip doing it.
	// The chosen tradeoff is that if an enqueue on a lower priority thread
	// contends with this fast path, this thread may receive a useless override.
	//
	// Global concurrent queues and queues bound to non-dispatch threads
	// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE// 死锁
	if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
		return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
				DC_FLAG_BARRIER | dc_flags);
	}

	if (unlikely(dl->do_targetq->do_targetq)) {
		return _dispatch_sync_recurse(dl, ctxt, func,
				DC_FLAG_BARRIER | dc_flags);
	}
	_dispatch_introspection_sync_begin(dl);
	_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
			DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
					dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}

在函数_dispatch_queue_try_acquire_barrier_sync_and_suspend中,从该函数我们可以知道,通过os_atomic_rmw_loop2o函数回调,从OS底层获取到了状态信息,并返回。

DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid)
{
	return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0);
}
-----------------------------------------------------------------------------------------
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
		uint32_t tid, uint64_t suspend_count)
{
	uint64_t init  = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
	uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
			_dispatch_lock_value_from_tid(tid) |
			(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
	uint64_t old_state, new_state;
	✅// 从底层获取信息 -- 状态信息 - 当前队列 - 线程
	return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
		uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
		if (old_state != (init | role)) {
			os_atomic_rmw_loop_give_up(break);
		}
		new_state = value | role;
	});
}

那么返回之后,就执行了_dispatch_sync_f_slow函数。通过下图崩溃堆栈我们也可以从侧方面验证。

其中通过源码可以发现,首先是生成了一些任务的信息,然后通过_dispatch_trace_item_push来进行压栈操作,从而存放在我们的同步队列中(FIFO),从而实现函数的执行。

_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
		dispatch_function_t func, uintptr_t top_dc_flags,
		dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
	...
	pthread_priority_t pp = _dispatch_get_priority();
	struct dispatch_sync_context_s dsc = {
		.dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
		.dc_func     = _dispatch_async_and_wait_invoke,
		.dc_ctxt     = &dsc,
		.dc_other    = top_dq,
		.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
		.dc_voucher  = _voucher_get(),
		.dsc_func    = func,
		.dsc_ctxt    = ctxt,
		.dsc_waiter  = _dispatch_tid_self(),
	};
	

	_dispatch_trace_item_push(top_dq, &dsc);
	__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

	...
}

那么产生死锁的主要检测就再__DISPATCH_WAIT_FOR_QUEUE__这个函数中了,通过查看函数,发现它会获取到队列的状态,看其是否为等待状态,然后调用_dq_state_drain_locked_by中的异或运算,判断队列和线程的等待状态,如果两者都在等待,那么就会返回YES,从而造成死锁的崩溃。

static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
    // 获取队列的状态,看是否是处于等待状态
    uint64_t dq_state = _dispatch_wait_prepare(dq);
    if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
    	DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
    			"dispatch_sync called on queue "
    			"already owned by current thread");
    }
    ...
}

-----------------------------------------------------------------------------------------

static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{   // lock_value 为队列状态,tid 为线程 id
    // ^ (异或运算法) 两个相同就会出现 0 否则为1
    return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}

小结一下

  1. _dispatch_sync首先获取当前线程的tid
  2. 获取到系统底层返回的status
  3. 获取到队列的等待状态和tid比较,如果相同,则表示正在死锁,从而崩溃

block任务的执行

对于同步任务的block执行,我们在继续跟进之前的源码_dispatch_sync源码中_dispatch_barrier_sync_f_inline函数,观看其函数实现,函数的执行主要是在_dispatch_client_callout方法中。

DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
		void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
    _dispatch_sync_function_invoke_inline(dq, ctxt, func);
    ...
}
-----------------------------------------------------------------------------------------

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
		dispatch_function_t func)
{
	dispatch_thread_frame_s dtf;
	_dispatch_thread_frame_push(&dtf, dq);
	// f(ctxt) -- func(ctxt)
	_dispatch_client_callout(ctxt, func);
	_dispatch_perfmon_workitem_inc();
	_dispatch_thread_frame_pop(&dtf);
}

查看_dispatch_client_callout方法,里面果然有函数的调用f(ctxt);

至此,同步函数的block调用完成

_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
	_dispatch_get_tsd_base();
	void *u = _dispatch_get_unwind_tsd();
	if (likely(!u)) return f(ctxt);
	_dispatch_set_unwind_tsd(NULL);
	f(ctxt);
	_dispatch_free_unwind_tsd();
	_dispatch_set_unwind_tsd(u);
}

小结一下

同步函数的block调用步骤

dispatch_sync  
└──_dispatch_barrier_sync_f_inline
    └──_dispatch_sync_invoke_and_complete
        └──_dispatch_sync_function_invoke_inline
           └──_dispatch_client_callout
              └──f(ctxt);

异步执行dispatch_async

看完了同步执行的相关源码,下面我们来看异步的执行就简单多了。

查看其源码,主要执行了两个函数_dispatch_continuation_init_dispatch_continuation_async,下面我们一个个来看一下

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

_dispatch_continuation_init

通过源码我们可知,这个函数dispatch_qos_t这个对象,里面的实现必然是对其进行初始化赋值的操作。

通过_dispatch_Block_invoke的宏定义,我们可以发现其对传入的dispatch_block_t回调参数进行了封装。

DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
		dispatch_queue_class_t dqu, dispatch_block_t work,
		dispatch_block_flags_t flags, uintptr_t dc_flags)
{
	void *ctxt = _dispatch_Block_copy(work);

	dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
	if (unlikely(_dispatch_block_has_private_data(work))) {
		dc->dc_flags = dc_flags;
		dc->dc_ctxt = ctxt;
		// will initialize all fields but requires dc_flags & dc_ctxt to be set
		return _dispatch_continuation_init_slow(dc, dqu, flags);
	}

	dispatch_function_t func = _dispatch_Block_invoke(work);
	if (dc_flags & DC_FLAG_CONSUME) {
		func = _dispatch_call_block_and_release;
	}
	return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}

-----------------------------------------------------------------------------------------

#define _dispatch_Block_invoke(bb) \
		((dispatch_function_t)((struct Block_layout *)bb)->invoke)

最终的封装会在_dispatch_continuation_init_f中,其代码也非常的简单,仍旧是函数式保存的赋值的相关操作,对回调等也进行了封装保存。

而进行封装保存的意义也很简单:因为异步需要在合适的时机进行线程回调block

DISPATCH_ALWAYS_INLINE
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
		dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
		dispatch_block_flags_t flags, uintptr_t dc_flags)
{
	pthread_priority_t pp = 0;
	dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
	dc->dc_func = f;
	dc->dc_ctxt = ctxt;
	// in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
	// should not be propagated, only taken from the handler if it has one
	if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
		pp = _dispatch_priority_propagate();
	}
	_dispatch_continuation_voucher_set(dc, flags);
	return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}

_dispatch_continuation_async

我们知道了上一步对信息进行函数式封装,那么对于一个异步执行来说,最重要的就是何时创建线程和函数执行呢,那么就再这个方法里面了。

查看方法,发现实现非常的简单,但是越简单的东西,其内里就越复杂。这个方法主要就是执行了dx_push方法,查看其代码,发现为宏定义,主要执行了dq_push方法.

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
		dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
	if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
		_dispatch_trace_item_push(dqu, dc);
	}
#else
	(void)dc_flags;
#endif
	return dx_push(dqu._dq, dc, qos);
}
-----------------------------------------------------------------------------------------

#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

那么dq_push又是怎么赋值的呢,由于其是一个属性,所以我们可以搜索.dq_pus来查看其赋值。我们发现其赋值的地方非常多,但是大体的意思我们可以理解,就是主要在根队列,自定义队列,主队列等等进行push操作的时候调用。

我们知道线程的创建一般都是在根队列上进行创建的,所以我们直接找根队列的dq_push赋值,这样比较快速,当然其他的也可以,最终都会走到这里。

我们发现_dispatch_root_queue_push方法最终会调用_dispatch_root_queue_push_inline方法,而_dispatch_root_queue_push_inline方法最终又会调用_dispatch_root_queue_poke

_dispatch_root_queue_poke这个函数主要进行了一些容错的判断,最终走到了_dispatch_root_queue_poke_slow相关的方法里

void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
		dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
	dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
	if (unlikely(ddi && ddi->ddi_can_stash)) 
	...一些不重要的操作 ...
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
	if (_dispatch_root_queue_push_needs_override(rq, qos)) {
		return _dispatch_root_queue_push_override(rq, dou, qos);
	}
#else
	(void)qos;
#endif
	_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
-----------------------------------------------------------------------------------------
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
		dispatch_object_t _head, dispatch_object_t _tail, int n)
{
	struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
	if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
		return _dispatch_root_queue_poke(dq, n, 0);
	}
}
-----------------------------------------------------------------------------------------
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
	if (!_dispatch_queue_class_probe(dq)) {
		return;
	}
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
	if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
#endif
	{
		if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
			_dispatch_root_queue_debug("worker thread request still pending "
					"for global queue: %p", dq);
			return;
		}
	}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
	return _dispatch_root_queue_poke_slow(dq, n, floor);
}

_dispatch_root_queue_poke_slow

这个方法就是异步执行的主要方法,创建线程也是在此,由于代码比较长,我们还是寻找代码中的关键节点来讲。

DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    ...
    ✅//队列初始化,runtime强转等操作,防止类型无法匹配等情况
    _dispatch_root_queues_init();
	_dispatch_debug_root_queue(dq, __func__);
	_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
	
	...
	
    int can_request, t_count;
	// seq_cst with atomic store to tail <rdar://problem/16932833>// 获取线程池的大小
	t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
	do {
	    ✅// 计算可以请求的数量
		can_request = t_count < floor ? 0 : t_count - floor;
		if (remaining > can_request) {
			_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
					remaining, can_request);
			os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
			remaining = can_request;
		}
		if (remaining == 0) {
		    // 线程池无可用将会报错
			_dispatch_root_queue_debug("pthread pool is full for root queue: "
					"%p", dq);
			return;
		}
	} while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
			t_count - remaining, &t_count, acquire));

	pthread_attr_t *attr = &pqc->dpq_thread_attr;
	pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
	if (unlikely(dq == &_dispatch_mgr_root_queue)) {
		pthr = _dispatch_mgr_root_queue_init();
	}
#endif
	do {
		_dispatch_retain(dq); // released in _dispatch_worker_thread
		✅✅✅//开辟线程✅✅✅
		while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
			if (r != EAGAIN) {
				(void)dispatch_assume_zero(r);
			}
			_dispatch_temporary_resource_shortage();
		}
	} while (--remaining);
#else
	(void)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
	
}

-----------------------------------------------------------------------------------------

#define _dispatch_trace_runtime_event(evt, ptr, value) \
		_dispatch_introspection_runtime_event(\
				dispatch_introspection_runtime_event_##evt, ptr, value)

根据代码可以知道,系统会获取线程池总数量和可以创建的数量,然后通过两个do while来进行动态的开辟线程。

单例dispatch_once

通过dispatch_once函数查看其底层调用,可以发现其最终调用到dispatch_once_f方法中。相关的代码如下。

  1. 首先我们知道val一开始为NULL,并将其转换为dispatch_once_gate_t
  2. 通过查看_dispatch_once_gate_tryenter源码,我们知道其在OS底层通过判断l->dgo_once是否为DLOCK_ONCE_UNLOCKED状态
  3. 如果成立,则会执行_dispatch_once_callout函数。执行对应的block,然后将l->dgo_once置为DLOCK_ONCE_DONE,从而保证了只执行一次
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
	// 如果你来过一次 -- 下次就不来
	dispatch_once_gate_t l = (dispatch_once_gate_t)val;
	//DLOCK_ONCE_DONE
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
	if (likely(v == DLOCK_ONCE_DONE)) {
		return;
	}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	if (likely(DISPATCH_ONCE_IS_GEN(v))) {
		return _dispatch_once_mark_done_if_quiesced(l, v);
	}
#endif
#endif

	// 满足条件 -- 试图进去
	if (_dispatch_once_gate_tryenter(l)) {
		// 单利调用 -- v->DLOCK_ONCE_DONE
		return _dispatch_once_callout(l, ctxt, func);
	}
	return _dispatch_once_wait(l);
}
-----------------------------------------------------------------------------------------

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
	// os 对象是否存储过
	// unlock
	return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
			(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}

-----------------------------------------------------------------------------------------

_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
		dispatch_function_t func)
{
	// block()
	_dispatch_client_callout(ctxt, func);
	_dispatch_once_gate_broadcast(l);
}

信号量dispatch_semaphore

dispatch_semaphore_create

这个方法比较明确,就是函数式保存,转换成了dispatch_semaphore_t对象。信号量的处理都是基于此对象来进行的。

dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
    dispatch_semaphore_t dsema;
    // 如果 value 小于 0 直接返回 0
    if (value < 0) {
    	return DISPATCH_BAD_INPUT;
    }
    dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
    		sizeof(struct dispatch_semaphore_s));
    dsema->do_next = DISPATCH_OBJECT_LISTLESS;
    dsema->do_targetq = _dispatch_get_default_queue(false);
    dsema->dsema_value = value;
    _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    dsema->dsema_orig = value;
    return dsema;
}

dispatch_semaphore_wait

wait函数主要进行了3步操作:

  1. 调用os_atomic_dec2o宏。通过对这个宏的查看,我们发现其就是一个对dsema进行原子性的-1操作
  2. 判断value是否>= 0,如果满足条件,则不阻塞,直接执行
  3. 调用_dispatch_semaphore_wait_slow。通过源码,我们可以发现其对timeout的参数进行了分别的处理
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
    long value = os_atomic_dec2o(dsema, dsema_value, acquire);
    if (likely(value >= 0)) {
	    return 0;
    }
    return _dispatch_semaphore_wait_slow(dsema, timeout);
}

#define os_atomic_dec2o(p, f, m) \
		os_atomic_sub2o(p, f, 1, m)
		
#define os_atomic_sub2o(p, f, v, m) \
		os_atomic_sub(&(p)->f, (v), m)
		
#define os_atomic_sub(p, v, m) \
		_os_atomic_c11_op((p), (v), m, sub, -)

_dispatch_semaphore_wait_slow函数的处理如下:

  1. default:主要调用了_dispatch_sema4_timedwait方法,这个方法主要是判断当前的操作是否超过指定的超时时间。
  2. DISPATCH_TIME_NOW中的while是一定会执行的,如果不满足条件,已经在之前的操作跳出了,不会执行到此。if操作调用os_atomic_cmpxchgvw2o,会将value进行+1,跳出阻塞,并返回_DSEMA4_TIMEOUT超时
  3. DISPATCH_TIME_FOREVER中即调用_dispatch_sema4_wait,表示会一直阻塞,知道等到single加1变为0为止,跳出阻塞
DISPATCH_NOINLINE
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
		dispatch_time_t timeout)
{
	long orig;

	_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
	switch (timeout) {
	default:
		if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
			break;
		}
		// Fall through and try to undo what the fast path did to
		// dsema->dsema_value
	case DISPATCH_TIME_NOW:
		orig = dsema->dsema_value;
		while (orig < 0) {
			if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
					&orig, relaxed)) {
				return _DSEMA4_TIMEOUT();
			}
		}
		// Another thread called semaphore_signal().
		// Fall through and drain the wakeup.
	case DISPATCH_TIME_FOREVER:
		_dispatch_sema4_wait(&dsema->dsema_sema);
		break;
	}
	return 0;
}

dispatch_semaphore_signal

了解了wait之后,对signal的理解也很简单。os_atomic_inc2o宏定义就是对dsema进行原子性+1的操作,如果大于0,则继续执行。

long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
	// 取值 + 1  == 0 + 1 = 1
	long value = os_atomic_inc2o(dsema, dsema_value, release);
	if (likely(value > 0)) {
		return 0;
	}
	if (unlikely(value == LONG_MIN)) {
		DISPATCH_CLIENT_CRASH(value,
				"Unbalanced call to dispatch_semaphore_signal()");
	}
	return _dispatch_semaphore_signal_slow(dsema);
}

总结一下信号的底层原理:

信号量在初始化时要指定 value,随后内部将这个 value 进行函数式保存。实际操作时会存两个 value,一个是当前的value,一个是记录初始 value。信号的 wait 和 signal 是互逆的两个操作,wait进行减1的操作,single进行加1的操作。初始 value 必须大于等于 0,如果为0或者小于0 并随后调用 wait 方法,线程将被阻塞直到别的线程调用了 signal 方法

调度组dispatch_group

其实dispatch_group的相关函数的底层原理和信号量的底层原理的思想是一样的。都是在底层维护了一个value的值,进组和出组操作时,对value的值进行操作,达到0这个临界值的时候,进行后续的操作。

dispatch_group_create

和信号量类似,创建组后,对其进行了函数式保存dispatch_group_t,并通过os_atomic_store2o宏定义,内部维护了一个value的值

dispatch_group_t
dispatch_group_create(void)
{
	return _dispatch_group_create_with_count(0);
}

-----------------------------------------------------------------------------------------

DISPATCH_ALWAYS_INLINE
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
	dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
			sizeof(struct dispatch_group_s));
	dg->do_next = DISPATCH_OBJECT_LISTLESS;
	dg->do_targetq = _dispatch_get_default_queue(false);
	if (n) {
		os_atomic_store2o(dg, dg_bits,
				-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
		os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
	}
	return dg;
}

dispatch_group_enter

通过源码,我们可以知道进组操作,主要是先通过os_atomic_sub_orig2o宏定义,对bit进行了原子性减1的操作,然后又通过位运算& DISPATCH_GROUP_VALUE_MASK获得真正的value

void
dispatch_group_enter(dispatch_group_t dg)
{
	// The value is decremented on a 32bits wide atomic so that the carry
	// for the 0 -> -1 transition is not propagated to the upper 32bits.
	uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
			DISPATCH_GROUP_VALUE_INTERVAL, acquire);
	uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
	if (unlikely(old_value == 0)) {
		_dispatch_retain(dg); // <rdar://problem/22318411>
	}
	if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
		DISPATCH_CLIENT_CRASH(old_bits,
				"Too many nested calls to dispatch_group_enter()");
	}
}

dispatch_group_leave

出组的操作即通过os_atomic_add_orig2o的对值进行原子性的加操作,并通过& DISPATCH_GROUP_VALUE_MASK获取到真实的value值。如果新旧两个值相等,则执行_dispatch_group_wake操作,进行后续的操作。

void
dispatch_group_leave(dispatch_group_t dg)
{
	// The value is incremented on a 64bits wide atomic so that the carry for
	// the -1 -> 0 transition increments the generation atomically.
	uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
			DISPATCH_GROUP_VALUE_INTERVAL, release);
	uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);

	if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
		old_state += DISPATCH_GROUP_VALUE_INTERVAL;
		
		do {
			new_state = old_state;
			if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
				new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			} else {
				// If the group was entered again since the atomic_add above,
				// we can't clear the waiters bit anymore as we don't know for
				// which generation the waiters are for
				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
			}
			if (old_state == new_state) break;
		} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
				old_state, new_state, &old_state, relaxed)));
		
		return _dispatch_group_wake(dg, old_state, true);
	}

	if (unlikely(old_value == 0)) {
		DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
				"Unbalanced call to dispatch_group_leave()");
	}
}

dispatch_group_async

dispatch_group_async函数就是对enterleave的封装。通过代码可以看出其和异步调用函数类似,都对block进行的封装保存。然后再内部执行的时候,手工调用了dispatch_group_enterdispatch_group_leave方法。

void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_block_t db)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
    dispatch_qos_t qos;
    // 保存任务 
    qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
    _dispatch_continuation_group_async(dg, dq, dc, qos);
}

static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dc, dispatch_qos_t qos)
{   // 进组
    dispatch_group_enter(dg);
    dc->dc_data = dg;
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
    struct dispatch_object_s *dou = dc->dc_data;
    unsigned long type = dx_type(dou);
    if (type == DISPATCH_GROUP_TYPE) {
    	_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
    	_dispatch_trace_item_complete(dc);
    	// 出组
    	dispatch_group_leave((dispatch_group_t)dou);
    } else {
    	DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
    }
}

dispatch_group_notify

通过源码,我们可以发现,通过调用os_atomic_rmw_loop2o在系统内核中获取到对应的状态,最终还是调用到了_dispatch_group_wake

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dsn)
{
	uint64_t old_state, new_state;
	dispatch_continuation_t prev;

	dsn->dc_data = dq;
	_dispatch_retain(dq);

	prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
	if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
	os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
	if (os_mpsc_push_was_empty(prev)) {
		os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
			new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
			if ((uint32_t)old_state == 0) {
				os_atomic_rmw_loop_give_up({
					return _dispatch_group_wake(dg, new_state, false);
				});
			}
		});
	}
}

_dispatch_group_wake这个函数主要分为两部分,首先循环调用 semaphore_signal 告知唤醒当初等待 group 的信号量,因此 dispatch_group_wait 函数得以返回。

然后获取链表,依次调用 dispatch_async_f 异步执行在 notify 函数中注册的回调。

DISPATCH_NOINLINE
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
	uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>

	if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
		dispatch_continuation_t dc, next_dc, tail;

		// Snapshot before anything is notified/woken <rdar://problem/8554546>
		dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
		do {
			dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
			next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
			_dispatch_continuation_async(dsn_queue, dc,
					_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
			_dispatch_release(dsn_queue);
		} while ((dc = next_dc));

		refs++;
	}

	if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
		_dispatch_wake_by_address(&dg->dg_gen);
	}

	if (refs) _dispatch_release_n(dg, refs);
}

总结

  • dispatch_sync 将任务 block 通过 push 到队列中,然后按照 FIFO 去执行。
  • dispatch_sync造成死锁的主要原因是堵塞的tid和现在运行的tid为同一个
  • dispatch_async 会把任务包装并保存,之后就会开辟相应线程去执行已保存的任务。
  • semaphore 主要在底层维护一个value的值,使用 signal 进行 + +1wait进行-1。如果value的值大于或者等于0,则取消阻塞,否则根据timeout参数进行超时判断
  • dispatch_group 底层也是维护了一个 value 的值,等待 group 完成实际上就是等待value恢复初始值。而notify的作用是将所有注册的回调组装成一个链表,在 dispatch_async 完成时判断 value 是不是恢复初始值,如果是则调用dispatch_async异步执行所有注册的回调。
  • dispatch_once 通过一个静态变量来标记 block 是否已被执行,同时使用加锁确保只有一个线程能执行,执行完 block 后会唤醒其他所有等待的线程。

参考资料

libdispatch

深入理解GCD