iOS源码解析: GCD的信号量semaphore

5,383 阅读10分钟

信号量是GCD中最常见的操作,通常用于保证资源的多线程安全性。其本质实际上是基于mach内核的信号量接口来实现的,本文将从源码角度对其进行解析。

@interface MyObject : NSObject
@property (nonatomic, strong) dispatch_semaphore_t sema;
@end

@implementation MyObject
@end

初始化信号量,然后可以看到这样一个结构:

myObj.sema = dispatch_semaphore_create(0);
// (lldb) po myObj.sema
// <OS_dispatch_semaphore: semaphore[0x6000007f14f0] = { xref = 1, ref = 1, port = 0x0, value = 0, orig = 0 }>

xref和ref是引用相关的。value和orig则是信号量执行任务的关键。执行一次dispatch_semaphore_wait操作后,value值会发生一次减操作。

dispatch_semaphore_wait(myObj.sema, DISPATCH_TIME_FOREVER);
// (lldb) po myObj.sema
// <OS_dispatch_semaphore: semaphore[0x60000133b890] = { xref = 2, ref = 1, port = 0x4007, value = -1, orig = 0 }>

那这些成员变量都是什么意思呢?

dispatch_semaphore_t

信号量的基本数据结构如下:

struct dispatch_semaphore_s {
	DISPATCH_OBJECT_HEADER(semaphore);
	long volatile dsema_value;
	long dsema_orig;
	_dispatch_sema4_t dsema_sema;
};
  1. DISPATCH_OBJECT_HEADER(semaphore),GCD中很多对象都有这个header,封装了一些统一的数据结构。
  2. dsema_orig即为信号量的初始值。
  3. dsema_value即为信号量当前值,信号量的相关API正是通过操作dsema_value来实现其功能的。
  4. _dispatch_sema4_t dsema_sema,信号量的结构。

_DISPATCH_OBJECT_HEADER是一个宏定义:

#define DISPATCH_OBJECT_HEADER(x) \
	struct dispatch_object_s _as_do[0]; \
	_DISPATCH_OBJECT_HEADER(x)

_DISPATCH_OBJECT_HEADER如下:

#define _DISPATCH_OBJECT_HEADER(x) \
	struct _os_object_s _as_os_obj[0]; \
	OS_OBJECT_STRUCT_HEADER(dispatch_##x); \
	struct dispatch_##x##_s *volatile do_next; \
	struct dispatch_queue_s *do_targetq; \
	void *do_ctxt; \
	void *do_finalizer

其中有两个成员比较关键:

struct dispatch_##x##_s *volatile do_next; \
struct dispatch_queue_s *do_targetq; \

这两个在后边会讲到。

dispatch_object_s对象也使用了_DISPATCH_OBJECT_HEADER:

struct dispatch_object_s {
	_DISPATCH_OBJECT_HEADER(object);
};

/*
 * Dispatch objects are NOT C++ objects. Nevertheless, we can at least keep C++
 * aware of type compatibility.
 */
typedef struct dispatch_object_s {
private:
	dispatch_object_s();
	~dispatch_object_s();
	dispatch_object_s(const dispatch_object_s &);
	void operator=(const dispatch_object_s &);
} *dispatch_object_t;

typedef union {
	struct _os_object_s *_os_obj;
	struct dispatch_object_s *_do;
	struct dispatch_queue_s *_dq;
	struct dispatch_queue_attr_s *_dqa;
	struct dispatch_group_s *_dg;
	struct dispatch_source_s *_ds;
	struct dispatch_mach_s *_dm;
	struct dispatch_mach_msg_s *_dmsg;
	struct dispatch_semaphore_s *_dsema;
	struct dispatch_data_s *_ddata;
	struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;

dispatch_object_t是一个联合体,libdispatch中的所有对象都需要使用到。

dispatch_semaphore_create

/*!
 * @function dispatch_semaphore_create
 *
 * @abstract
 * Creates new counting semaphore with an initial value.
 *
 * @discussion
 * Passing zero for the value is useful for when two threads need to reconcile
 * the completion of a particular event. Passing a value greater than zero is
 * useful for managing a finite pool of resources, where the pool size is equal
 * to the value.
 *
 * @param value
 * The starting value for the semaphore. Passing a value less than zero will
 * cause NULL to be returned.
 *
 * @result
 * The newly created semaphore, or NULL on failure.
 */
API_AVAILABLE(macos(10.6), ios(4.0))
DISPATCH_EXPORT DISPATCH_MALLOC DISPATCH_RETURNS_RETAINED DISPATCH_WARN_RESULT
DISPATCH_NOTHROW
dispatch_semaphore_t
dispatch_semaphore_create(long value);

参数value如果小于0,则无效。其实现源码如下:

dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
	dispatch_semaphore_t dsema;

	// If the internal value is negative, then the absolute of the value is
	// equal to the number of waiting threads. Therefore it is bogus to
	// initialize the semaphore with a negative value.
	if (value < 0) {
		return DISPATCH_BAD_INPUT;
	}

	dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
			sizeof(struct dispatch_semaphore_s));
	dsema->do_next = DISPATCH_OBJECT_LISTLESS;
	// 目标队列
	dsema->do_targetq = _dispatch_get_default_queue(false);
    // 当前值
	dsema->dsema_value = value;
	_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    // 初始值
	dsema->dsema_orig = value;
	return dsema;
}

_dispatch_object_alloc

_dispatch_object_alloc的第一个参数DISPATCH_VTABLE(semaphore),设置了dispatch_semaphore_t的相关回调函数,如销毁函数_dispatch_semaphore_dispose。

void *
_dispatch_object_alloc(const void *vtable, size_t size)
{
#if OS_OBJECT_HAVE_OBJC1
	const struct dispatch_object_vtable_s *_vtable = vtable;
	dispatch_object_t dou;
	dou._os_obj = _os_object_alloc_realized(_vtable->_os_obj_objc_isa, size);
	dou._do->do_vtable = vtable;
	return dou._do;
#else
	return _os_object_alloc_realized(vtable, size);
#endif
}

_os_object_alloc_realized函数如下,其中会调用_os_objc_alloc函数。

_os_object_t
_os_object_alloc_realized(const void *cls, size_t size)
{
	dispatch_assert(size >= sizeof(struct _os_object_s));
	return _os_objc_alloc(cls, size);
}
static inline id
_os_objc_alloc(Class cls, size_t size)
{
	id obj;
	size -= sizeof(((struct _os_object_s *)NULL)->os_obj_isa);
	while (unlikely(!(obj = class_createInstance(cls, size)))) {
		_dispatch_temporary_resource_shortage();
	}
	return obj;
}

DISPATCH_VTABLE(semaphore)

DISPATCH_VTABLE的定义如下:

#define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)

// vtable symbols
#define OS_OBJECT_VTABLE(name)		(&OS_OBJECT_CLASS_SYMBOL(name))
#define DISPATCH_OBJC_CLASS(name)	(&DISPATCH_CLASS_SYMBOL(name))

#define DISPATCH_CLASS_SYMBOL(name) OS_dispatch_##name##_class

实际上, DISPATCH_VTABLE(semaphore) 即为 &OS_dispatch_semaphore_class

另外还有一个宏DISPATCH_VTABLE_INSTANCE,

DISPATCH_VTABLE_INSTANCE(semaphore,
	.do_type        = DISPATCH_SEMAPHORE_TYPE,
	.do_dispose     = _dispatch_semaphore_dispose,
	.do_debug       = _dispatch_semaphore_debug,
	.do_invoke      = _dispatch_object_no_invoke,
);

#define DISPATCH_VTABLE_INSTANCE(name, ...) \
		DISPATCH_VTABLE_SUBCLASS_INSTANCE(name, name, __VA_ARGS__)

#define DISPATCH_VTABLE_SUBCLASS_INSTANCE(name, ctype, ...) \
		OS_OBJECT_VTABLE_SUBCLASS_INSTANCE(dispatch_##name, dispatch_##ctype, \
				_dispatch_xref_dispose, _dispatch_dispose, __VA_ARGS__)		

// vtables for proper classes
#define OS_OBJECT_VTABLE_INSTANCE(name, xdispose, dispose, ...) \
		OS_OBJECT_VTABLE_SUBCLASS_INSTANCE(name, name, \
				xdispose, dispose, __VA_ARGS__)

#define OS_OBJECT_VTABLE_SUBCLASS_INSTANCE(name, ctype, xdispose, dispose, ...) \
		__attribute__((section("__DATA,__objc_data"), used)) \
		const struct ctype##_extra_vtable_s \
		OS_OBJECT_EXTRA_VTABLE_SYMBOL(name) = { __VA_ARGS__ }

#define OS_OBJECT_EXTRA_VTABLE_SYMBOL(name) _OS_##name##_vtable										

这一堆宏定义,绕来绕去,意图就是建立一个vtable。vtable是虚函数表,可以通过索引方式来快速获取方法。相比于OC的方法查找,vtable的方式会有极大的性能提升。Swift中就大量使用了类似的vtable机制,如。

// 查处方法
let method = MyClass.vtable[methodIndex]
// 调用方法
method()

_dispatch_sema4_create

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sema4_create(_dispatch_sema4_t *sema, int policy)
{
	if (!_dispatch_sema4_is_created(sema)) {
		_dispatch_sema4_create_slow(sema, policy);
	}
}

_dispatch_sema4_create_slow的源码如下:

#define _dispatch_sema4_is_created(sema)   (*(sema) != MACH_PORT_NULL)

void
_dispatch_sema4_create_slow(_dispatch_sema4_t *s4, int policy)
{
	semaphore_t tmp = MACH_PORT_NULL;

	_dispatch_fork_becomes_unsafe();

	// lazily allocate the semaphore port

	// Someday:
	// 1) Switch to a doubly-linked FIFO in user-space.
	// 2) User-space timers for the timeout.

#if DISPATCH_USE_OS_SEMAPHORE_CACHE
	if (policy == _DSEMA4_POLICY_FIFO) {
		tmp = (_dispatch_sema4_t)os_get_cached_semaphore();
		if (!os_atomic_cmpxchg(s4, MACH_PORT_NULL, tmp, relaxed)) {
			os_put_cached_semaphore((os_semaphore_t)tmp);
		}
		return;
	}
#endif

	kern_return_t kr = semaphore_create(mach_task_self(), &tmp, policy, 0);
	DISPATCH_SEMAPHORE_VERIFY_KR(kr);

	if (!os_atomic_cmpxchg(s4, MACH_PORT_NULL, tmp, relaxed)) {
		kr = semaphore_destroy(mach_task_self(), tmp);
		DISPATCH_SEMAPHORE_VERIFY_KR(kr);
	}
}

如果用到了信号量缓存(DISPATCH_USE_OS_SEMAPHORE_CACHE),且是FIFO,则会直接从cache中取出信号量来使用。

否则,就使用semaphore_create来创建一个新的信号量。

dispatch_semaphore_wait

wait操作,会将信号量的值减一,若减操作后的结果值为负数,则函数会一直等待信号量的释放。

/*!
 * @function dispatch_semaphore_wait
 *
 * @abstract
 * Wait (decrement) for a semaphore.
 *
 * @discussion
 * Decrement the counting semaphore. If the resulting value is less than zero,
 * this function waits for a signal to occur before returning.
 *
 * @param dsema
 * The semaphore. The result of passing NULL in this parameter is undefined.
 *
 * @param timeout
 * When to timeout (see dispatch_time). As a convenience, there are the
 * DISPATCH_TIME_NOW and DISPATCH_TIME_FOREVER constants.
 *
 * @result
 * Returns zero on success, or non-zero if the timeout occurred.
 */
API_AVAILABLE(macos(10.6), ios(4.0))
DISPATCH_EXPORT DISPATCH_NONNULL_ALL DISPATCH_NOTHROW
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout);

源码实现如下:

long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
	long value = os_atomic_dec2o(dsema, dsema_value, acquire);
	if (likely(value >= 0)) {
		return 0;
	}
	return _dispatch_semaphore_wait_slow(dsema, timeout);
}

dispatch_semaphore_wait一开始调用了系统的原子操作os_atomic_dec2o,将信号量的值减一。该操作之后,若信号量不是负数,则依然有信号量资源可用。若为负数,则执行_dispatch_semaphore_wait_slow进入等待。

DISPATCH_NOINLINE
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
		dispatch_time_t timeout)
{
	long orig;

	_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
	switch (timeout) {
	default:
		if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
			break;
		}
		// Fall through and try to undo what the fast path did to
		// dsema->dsema_value
	case DISPATCH_TIME_NOW:
		orig = dsema->dsema_value;
		while (orig < 0) {
			if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
					&orig, relaxed)) {
				return _DSEMA4_TIMEOUT();
			}
		}
		// Another thread called semaphore_signal().
		// Fall through and drain the wakeup.
	case DISPATCH_TIME_FOREVER:
		_dispatch_sema4_wait(&dsema->dsema_sema);
		break;
	}
	return 0;
}

_dispatch_semaphore_wait_slow函数根据timeout来决定等待行为,

  1. 如果是一个特定的timeout时间,则调用_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)进行timeout时间的等待行为。
  2. 如果是DISPATCH_TIME_NOW,先取得信号量的当前值,且该值一定为非负数。因为若为负数,则一定是上一次调用dispatch_semaphore_wait的时候执行的减操作变成负数,然而变成负数后,dispatch_semaphore_wait函数则会阻塞。之后,调用os_atomic_cmpxchgvw2o函数,会将信号量值加一,然后给到信号量的dsema_value变量。这个加一即为抵消最初调用dispatch_semaphore_wait时的减一操作。所以DISPATCH_TIME_NOW这种情况会立即返回超时。
  3. 如果是DISPATCH_TIME_FOREVER,则直接调用_dispatch_sema4_wait(&dsema->dsema_sema);永远等待,直至信号量释放。

_dispatch_sema4_wait和_dispatch_sema4_timedwait的源码如下,

void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
	kern_return_t kr;
	do {
		kr = semaphore_wait(*sema);
	} while (kr == KERN_ABORTED);
	DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}

bool
_dispatch_sema4_timedwait(_dispatch_sema4_t *sema, dispatch_time_t timeout)
{
	mach_timespec_t _timeout;
	kern_return_t kr;

	do {
		uint64_t nsec = _dispatch_timeout(timeout);
		_timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
		_timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
		kr = semaphore_timedwait(*sema, _timeout);
	} while (unlikely(kr == KERN_ABORTED));

	if (kr == KERN_OPERATION_TIMED_OUT) {
		return true;
	}
	DISPATCH_SEMAPHORE_VERIFY_KR(kr);
	return false;
}

其中调用了mach内核的信号量接口semaphore_wait和semaphore_timedwait进行wait操作。所以,GCD的信号量实际上是基于mach内核的信号量接口来实现。semaphore_timedwait函数即可以指定超时时间。

dispatch_semaphore_signal

dispatch_semaphore_signal负责释放信号量。

/*!
 * @function dispatch_semaphore_signal
 *
 * @abstract
 * Signal (increment) a semaphore.
 *
 * @discussion
 * Increment the counting semaphore. If the previous value was less than zero,
 * this function wakes a waiting thread before returning.
 *
 * @param dsema The counting semaphore.
 * The result of passing NULL in this parameter is undefined.
 *
 * @result
 * This function returns non-zero if a thread is woken. Otherwise, zero is
 * returned.
 */
API_AVAILABLE(macos(10.6), ios(4.0))
DISPATCH_EXPORT DISPATCH_NONNULL_ALL DISPATCH_NOTHROW
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema);

dispatch_semaphore_signal的源码如下:

long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
	long value = os_atomic_inc2o(dsema, dsema_value, release);
	if (likely(value > 0)) {
		return 0;
	}
	if (unlikely(value == LONG_MIN)) {
		DISPATCH_CLIENT_CRASH(value,
				"Unbalanced call to dispatch_semaphore_signal()");
	}
	return _dispatch_semaphore_signal_slow(dsema);
}

dispatch_semaphore_signal操作,则是执行了原子操作os_atomic_inc2o,将信号量的值加一。若过度释放,导致信号量的值为LONG_MIN,则会触发crash,信息为 ***Unbalanced call to dispatch_semaphore_signal()***。所以,跟GCD group的enter/leave类似,过度调用dispatch_semaphore_signal,理论上来说会导致崩溃。但并未实际复现出来,很奇怪。

而_dispatch_semaphore_signal_slow实际上会调用mach内核的semaphore_signal函数。

DISPATCH_NOINLINE
long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
	_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
	_dispatch_sema4_signal(&dsema->dsema_sema, 1);
	return 1;
}

void
_dispatch_sema4_signal(_dispatch_sema4_t *sema, long count)
{
	do {
		kern_return_t kr = semaphore_signal(*sema);
		DISPATCH_SEMAPHORE_VERIFY_KR(kr);
	} while (--count);
}

semaphore_signal能够唤醒一个在semaphore_wait中等待的线程。如果有多个等待线程,则根据线程优先级来唤醒。

_dispatch_semaphore_dispose

信号量的销毁函数如下:

void
_dispatch_semaphore_dispose(dispatch_object_t dou,
		DISPATCH_UNUSED bool *allow_free)
{
	dispatch_semaphore_t dsema = dou._dsema;

	if (dsema->dsema_value < dsema->dsema_orig) {
		DISPATCH_CLIENT_CRASH(dsema->dsema_orig - dsema->dsema_value,
				"Semaphore object deallocated while in use");
	}

	_dispatch_sema4_dispose(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
}

这里有一个判断,若 dsema->dsema_value < dsema->dsema_orig,则导致崩溃,并提示 Semaphore object deallocated while in use。这也是容易遇到的问题之一,后边会讲到。

_dispatch_sema4_dispose代码如下:

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sema4_dispose(_dispatch_sema4_t *sema, int policy)
{
	if (_dispatch_sema4_is_created(sema)) {
		_dispatch_sema4_dispose_slow(sema, policy);
	}
}

进一步往下,会调用_dispatch_sema4_dispose_slow函数,

void
_dispatch_sema4_dispose_slow(_dispatch_sema4_t *sema, int policy)
{
	semaphore_t sema_port = *sema;
	*sema = MACH_PORT_DEAD;
#if DISPATCH_USE_OS_SEMAPHORE_CACHE
	if (policy == _DSEMA4_POLICY_FIFO) {
		return os_put_cached_semaphore((os_semaphore_t)sema_port);
	}
#endif
	kern_return_t kr = semaphore_destroy(mach_task_self(), sema_port);
	DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}

如果使用了信号量缓存,且FIFO,则将待回收的信号量对象放入缓存即可。否则,调用mach内核的semaphore_destroy函数进行信号量的销毁。

一些崩溃

跟GCD group的enter/leave类似,这一类接口要保证操作的平衡。否则可能导致严重的问题。苹果的文档已经说得很清楚了:

Calls to dispatch_semaphore_signal must be balanced with calls to dispatch_semaphore_wait. Attempting to dispose of a semaphore with a count lower than value causes an EXC_BAD_INSTRUCTION exception.

Unbalanced call to dispatch_semaphore_signal()

这一点暂时未能复现。不过看原理,应该就是过度调用dispatch_semaphore_signal导致的。

Semaphore object deallocated while in use

Exception Type:  EXC_BREAKPOINT (SIGTRAP)
Exception Codes: 0x0000000000000001, 0x00000001a3b77ff4
Termination Signal: Trace/BPT trap: 5
Termination Reason: Namespace SIGNAL, Code 0x5
Terminating Process: exc handler [249]
Triggered by Thread:  15

Application Specific Information:
BUG IN CLIENT OF LIBDISPATCH: Semaphore object deallocated while in use
Abort Cause 1
Thread 15 name:  Dispatch queue: com.xxxx.xxxx.xxxxQueue (QOS: UNSPECIFIED)

Thread 15 Crashed:
0   libdispatch.dylib             	0x00000001a3b77ff4 0x1a3b75000 + 12276
1   libdispatch.dylib             	0x00000001a3b77014 0x1a3b75000 + 8212
2   libobjc.A.dylib               	0x00000001a336e7cc 0x1a336a000 + 18380
3   libobjc.A.dylib               	0x00000001a337e6b8 0x1a336a000 + 83640
4   libobjc.A.dylib               	0x00000001a337e720 0x1a336a000 + 83744
5   XXXX                          	0x000000010535056c -[MyXXXXObject dealloc] + 70960492 (MyXXXXObject.mm:xx)
6   XXXX                          	0x000000010535171c __destroy_helper_block_ea8_32s40s48s56s64r + 70965020 (MyXXXXObject.mm:xxx)
7   libsystem_blocks.dylib        	0x00000001a3c30a44 0x1a3c30000 + 2628
8   Foundation                    	0x00000001a4b09410 0x1a4aea000 + 128016
9   Foundation                    	0x00000001a4b97330 0x1a4aea000 + 709424
10  libsystem_blocks.dylib        	0x00000001a3c30a44 0x1a3c30000 + 2628
11  libdispatch.dylib             	0x00000001a3bd57d4 0x1a3b75000 + 395220
12  libdispatch.dylib             	0x00000001a3b7a01c 0x1a3b75000 + 20508
13  libdispatch.dylib             	0x00000001a3b796e0 0x1a3b75000 + 18144
14  libdispatch.dylib             	0x00000001a3b86030 0x1a3b75000 + 69680
15  libdispatch.dylib             	0x00000001a3b868d4 0x1a3b75000 + 71892
16  libsystem_pthread.dylib       	0x00000001a3db61b4 0x1a3daa000 + 49588
17  libsystem_pthread.dylib       	0x00000001a3db8cd4 0x1a3daa000 + 60628

这类crash的原因在于对象释放的时候,其内部持有的信号量对象依然在使用(即dsema->dsema_value < dsema->dsema_orig,信号量的值未通过signal操作恢复到其原始值)。常见于:其他线程执行了wait操作,而在没有对应signal的前提下,即将该信号量释放了(持有信号量的对象释放导致的)。

可以通过下边代码来复现,以查看调用堆栈。

dispatch_semaphore_t sema = dispatch_semaphore_create(1);
dispatch_semaphore_wait(sema, DISPATCH_TIME_FOREVER);

sema = dispatch_semaphore_create(1);

在信号量依然在使用的时候,重新给sema赋值就会导致崩溃,即便是 sema = nil; 也会。堆栈如下:

_dispatch_semaphore_dispose.cold.1
_disaptch_semaphore_dispose
_dispatch_dispose
sema = dispatch_semaphore_create(1);

当然,_disaptch_semaphore_dispose中会调用_dispatch_sema4_dispose_slow,这些我们在之前代码就已经分析过了。

解决办法:如果对于已知场景的信号量,比如信号量初始值为1的情况,可以在对象dealloc的时候手动执行一次signal操作,以免出现 dsema->dsema_value < dsema->dsema_orig 的情况。毕竟多执行了signal操作,目前是不会有问题的。

然而,实际的使用场景还是要尽量保证信号量的wait和signal保持平衡,这样代码逻辑才不会有问题。

参考资料