信号量是GCD中最常见的操作,通常用于保证资源的多线程安全性。其本质实际上是基于mach内核的信号量接口来实现的,本文将从源码角度对其进行解析。
@interface MyObject : NSObject
@property (nonatomic, strong) dispatch_semaphore_t sema;
@end
@implementation MyObject
@end
初始化信号量,然后可以看到这样一个结构:
myObj.sema = dispatch_semaphore_create(0);
// (lldb) po myObj.sema
// <OS_dispatch_semaphore: semaphore[0x6000007f14f0] = { xref = 1, ref = 1, port = 0x0, value = 0, orig = 0 }>
xref和ref是引用相关的。value和orig则是信号量执行任务的关键。执行一次dispatch_semaphore_wait操作后,value值会发生一次减操作。
dispatch_semaphore_wait(myObj.sema, DISPATCH_TIME_FOREVER);
// (lldb) po myObj.sema
// <OS_dispatch_semaphore: semaphore[0x60000133b890] = { xref = 2, ref = 1, port = 0x4007, value = -1, orig = 0 }>
那这些成员变量都是什么意思呢?
dispatch_semaphore_t
信号量的基本数据结构如下:
struct dispatch_semaphore_s {
DISPATCH_OBJECT_HEADER(semaphore);
long volatile dsema_value;
long dsema_orig;
_dispatch_sema4_t dsema_sema;
};
- DISPATCH_OBJECT_HEADER(semaphore),GCD中很多对象都有这个header,封装了一些统一的数据结构。
- dsema_orig即为信号量的初始值。
- dsema_value即为信号量当前值,信号量的相关API正是通过操作dsema_value来实现其功能的。
- _dispatch_sema4_t dsema_sema,信号量的结构。
_DISPATCH_OBJECT_HEADER是一个宏定义:
#define DISPATCH_OBJECT_HEADER(x) \
struct dispatch_object_s _as_do[0]; \
_DISPATCH_OBJECT_HEADER(x)
_DISPATCH_OBJECT_HEADER如下:
#define _DISPATCH_OBJECT_HEADER(x) \
struct _os_object_s _as_os_obj[0]; \
OS_OBJECT_STRUCT_HEADER(dispatch_##x); \
struct dispatch_##x##_s *volatile do_next; \
struct dispatch_queue_s *do_targetq; \
void *do_ctxt; \
void *do_finalizer
其中有两个成员比较关键:
struct dispatch_##x##_s *volatile do_next; \
struct dispatch_queue_s *do_targetq; \
这两个在后边会讲到。
dispatch_object_s对象也使用了_DISPATCH_OBJECT_HEADER:
struct dispatch_object_s {
_DISPATCH_OBJECT_HEADER(object);
};
/*
* Dispatch objects are NOT C++ objects. Nevertheless, we can at least keep C++
* aware of type compatibility.
*/
typedef struct dispatch_object_s {
private:
dispatch_object_s();
~dispatch_object_s();
dispatch_object_s(const dispatch_object_s &);
void operator=(const dispatch_object_s &);
} *dispatch_object_t;
typedef union {
struct _os_object_s *_os_obj;
struct dispatch_object_s *_do;
struct dispatch_queue_s *_dq;
struct dispatch_queue_attr_s *_dqa;
struct dispatch_group_s *_dg;
struct dispatch_source_s *_ds;
struct dispatch_mach_s *_dm;
struct dispatch_mach_msg_s *_dmsg;
struct dispatch_semaphore_s *_dsema;
struct dispatch_data_s *_ddata;
struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;
dispatch_object_t是一个联合体,libdispatch中的所有对象都需要使用到。
dispatch_semaphore_create
/*!
* @function dispatch_semaphore_create
*
* @abstract
* Creates new counting semaphore with an initial value.
*
* @discussion
* Passing zero for the value is useful for when two threads need to reconcile
* the completion of a particular event. Passing a value greater than zero is
* useful for managing a finite pool of resources, where the pool size is equal
* to the value.
*
* @param value
* The starting value for the semaphore. Passing a value less than zero will
* cause NULL to be returned.
*
* @result
* The newly created semaphore, or NULL on failure.
*/
API_AVAILABLE(macos(10.6), ios(4.0))
DISPATCH_EXPORT DISPATCH_MALLOC DISPATCH_RETURNS_RETAINED DISPATCH_WARN_RESULT
DISPATCH_NOTHROW
dispatch_semaphore_t
dispatch_semaphore_create(long value);
参数value如果小于0,则无效。其实现源码如下:
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return DISPATCH_BAD_INPUT;
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
// 目标队列
dsema->do_targetq = _dispatch_get_default_queue(false);
// 当前值
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
// 初始值
dsema->dsema_orig = value;
return dsema;
}
_dispatch_object_alloc
_dispatch_object_alloc的第一个参数DISPATCH_VTABLE(semaphore),设置了dispatch_semaphore_t的相关回调函数,如销毁函数_dispatch_semaphore_dispose。
void *
_dispatch_object_alloc(const void *vtable, size_t size)
{
#if OS_OBJECT_HAVE_OBJC1
const struct dispatch_object_vtable_s *_vtable = vtable;
dispatch_object_t dou;
dou._os_obj = _os_object_alloc_realized(_vtable->_os_obj_objc_isa, size);
dou._do->do_vtable = vtable;
return dou._do;
#else
return _os_object_alloc_realized(vtable, size);
#endif
}
_os_object_alloc_realized函数如下,其中会调用_os_objc_alloc函数。
_os_object_t
_os_object_alloc_realized(const void *cls, size_t size)
{
dispatch_assert(size >= sizeof(struct _os_object_s));
return _os_objc_alloc(cls, size);
}
static inline id
_os_objc_alloc(Class cls, size_t size)
{
id obj;
size -= sizeof(((struct _os_object_s *)NULL)->os_obj_isa);
while (unlikely(!(obj = class_createInstance(cls, size)))) {
_dispatch_temporary_resource_shortage();
}
return obj;
}
DISPATCH_VTABLE(semaphore)
DISPATCH_VTABLE的定义如下:
#define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)
// vtable symbols
#define OS_OBJECT_VTABLE(name) (&OS_OBJECT_CLASS_SYMBOL(name))
#define DISPATCH_OBJC_CLASS(name) (&DISPATCH_CLASS_SYMBOL(name))
#define DISPATCH_CLASS_SYMBOL(name) OS_dispatch_##name##_class
实际上, DISPATCH_VTABLE(semaphore) 即为 &OS_dispatch_semaphore_class。
另外还有一个宏DISPATCH_VTABLE_INSTANCE,
DISPATCH_VTABLE_INSTANCE(semaphore,
.do_type = DISPATCH_SEMAPHORE_TYPE,
.do_dispose = _dispatch_semaphore_dispose,
.do_debug = _dispatch_semaphore_debug,
.do_invoke = _dispatch_object_no_invoke,
);
#define DISPATCH_VTABLE_INSTANCE(name, ...) \
DISPATCH_VTABLE_SUBCLASS_INSTANCE(name, name, __VA_ARGS__)
#define DISPATCH_VTABLE_SUBCLASS_INSTANCE(name, ctype, ...) \
OS_OBJECT_VTABLE_SUBCLASS_INSTANCE(dispatch_##name, dispatch_##ctype, \
_dispatch_xref_dispose, _dispatch_dispose, __VA_ARGS__)
// vtables for proper classes
#define OS_OBJECT_VTABLE_INSTANCE(name, xdispose, dispose, ...) \
OS_OBJECT_VTABLE_SUBCLASS_INSTANCE(name, name, \
xdispose, dispose, __VA_ARGS__)
#define OS_OBJECT_VTABLE_SUBCLASS_INSTANCE(name, ctype, xdispose, dispose, ...) \
__attribute__((section("__DATA,__objc_data"), used)) \
const struct ctype##_extra_vtable_s \
OS_OBJECT_EXTRA_VTABLE_SYMBOL(name) = { __VA_ARGS__ }
#define OS_OBJECT_EXTRA_VTABLE_SYMBOL(name) _OS_##name##_vtable
这一堆宏定义,绕来绕去,意图就是建立一个vtable。vtable是虚函数表,可以通过索引方式来快速获取方法。相比于OC的方法查找,vtable的方式会有极大的性能提升。Swift中就大量使用了类似的vtable机制,如。
// 查处方法
let method = MyClass.vtable[methodIndex]
// 调用方法
method()
_dispatch_sema4_create
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sema4_create(_dispatch_sema4_t *sema, int policy)
{
if (!_dispatch_sema4_is_created(sema)) {
_dispatch_sema4_create_slow(sema, policy);
}
}
_dispatch_sema4_create_slow的源码如下:
#define _dispatch_sema4_is_created(sema) (*(sema) != MACH_PORT_NULL)
void
_dispatch_sema4_create_slow(_dispatch_sema4_t *s4, int policy)
{
semaphore_t tmp = MACH_PORT_NULL;
_dispatch_fork_becomes_unsafe();
// lazily allocate the semaphore port
// Someday:
// 1) Switch to a doubly-linked FIFO in user-space.
// 2) User-space timers for the timeout.
#if DISPATCH_USE_OS_SEMAPHORE_CACHE
if (policy == _DSEMA4_POLICY_FIFO) {
tmp = (_dispatch_sema4_t)os_get_cached_semaphore();
if (!os_atomic_cmpxchg(s4, MACH_PORT_NULL, tmp, relaxed)) {
os_put_cached_semaphore((os_semaphore_t)tmp);
}
return;
}
#endif
kern_return_t kr = semaphore_create(mach_task_self(), &tmp, policy, 0);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
if (!os_atomic_cmpxchg(s4, MACH_PORT_NULL, tmp, relaxed)) {
kr = semaphore_destroy(mach_task_self(), tmp);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}
}
如果用到了信号量缓存(DISPATCH_USE_OS_SEMAPHORE_CACHE),且是FIFO,则会直接从cache中取出信号量来使用。
否则,就使用semaphore_create来创建一个新的信号量。
dispatch_semaphore_wait
wait操作,会将信号量的值减一,若减操作后的结果值为负数,则函数会一直等待信号量的释放。
/*!
* @function dispatch_semaphore_wait
*
* @abstract
* Wait (decrement) for a semaphore.
*
* @discussion
* Decrement the counting semaphore. If the resulting value is less than zero,
* this function waits for a signal to occur before returning.
*
* @param dsema
* The semaphore. The result of passing NULL in this parameter is undefined.
*
* @param timeout
* When to timeout (see dispatch_time). As a convenience, there are the
* DISPATCH_TIME_NOW and DISPATCH_TIME_FOREVER constants.
*
* @result
* Returns zero on success, or non-zero if the timeout occurred.
*/
API_AVAILABLE(macos(10.6), ios(4.0))
DISPATCH_EXPORT DISPATCH_NONNULL_ALL DISPATCH_NOTHROW
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout);
源码实现如下:
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
dispatch_semaphore_wait一开始调用了系统的原子操作os_atomic_dec2o,将信号量的值减一。该操作之后,若信号量不是负数,则依然有信号量资源可用。若为负数,则执行_dispatch_semaphore_wait_slow进入等待。
DISPATCH_NOINLINE
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
_dispatch_semaphore_wait_slow函数根据timeout来决定等待行为,
- 如果是一个特定的timeout时间,则调用_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)进行timeout时间的等待行为。
- 如果是DISPATCH_TIME_NOW,先取得信号量的当前值,且该值一定为非负数。因为若为负数,则一定是上一次调用dispatch_semaphore_wait的时候执行的减操作变成负数,然而变成负数后,dispatch_semaphore_wait函数则会阻塞。之后,调用os_atomic_cmpxchgvw2o函数,会将信号量值加一,然后给到信号量的dsema_value变量。这个加一即为抵消最初调用dispatch_semaphore_wait时的减一操作。所以DISPATCH_TIME_NOW这种情况会立即返回超时。
- 如果是DISPATCH_TIME_FOREVER,则直接调用_dispatch_sema4_wait(&dsema->dsema_sema);永远等待,直至信号量释放。
_dispatch_sema4_wait和_dispatch_sema4_timedwait的源码如下,
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
kern_return_t kr;
do {
kr = semaphore_wait(*sema);
} while (kr == KERN_ABORTED);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}
bool
_dispatch_sema4_timedwait(_dispatch_sema4_t *sema, dispatch_time_t timeout)
{
mach_timespec_t _timeout;
kern_return_t kr;
do {
uint64_t nsec = _dispatch_timeout(timeout);
_timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
_timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
kr = semaphore_timedwait(*sema, _timeout);
} while (unlikely(kr == KERN_ABORTED));
if (kr == KERN_OPERATION_TIMED_OUT) {
return true;
}
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
return false;
}
其中调用了mach内核的信号量接口semaphore_wait和semaphore_timedwait进行wait操作。所以,GCD的信号量实际上是基于mach内核的信号量接口来实现。semaphore_timedwait函数即可以指定超时时间。
dispatch_semaphore_signal
dispatch_semaphore_signal负责释放信号量。
/*!
* @function dispatch_semaphore_signal
*
* @abstract
* Signal (increment) a semaphore.
*
* @discussion
* Increment the counting semaphore. If the previous value was less than zero,
* this function wakes a waiting thread before returning.
*
* @param dsema The counting semaphore.
* The result of passing NULL in this parameter is undefined.
*
* @result
* This function returns non-zero if a thread is woken. Otherwise, zero is
* returned.
*/
API_AVAILABLE(macos(10.6), ios(4.0))
DISPATCH_EXPORT DISPATCH_NONNULL_ALL DISPATCH_NOTHROW
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema);
dispatch_semaphore_signal的源码如下:
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
dispatch_semaphore_signal操作,则是执行了原子操作os_atomic_inc2o,将信号量的值加一。若过度释放,导致信号量的值为LONG_MIN,则会触发crash,信息为 ***Unbalanced call to dispatch_semaphore_signal()***。所以,跟GCD group的enter/leave类似,过度调用dispatch_semaphore_signal,理论上来说会导致崩溃。但并未实际复现出来,很奇怪。
而_dispatch_semaphore_signal_slow实际上会调用mach内核的semaphore_signal函数。
DISPATCH_NOINLINE
long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
_dispatch_sema4_signal(&dsema->dsema_sema, 1);
return 1;
}
void
_dispatch_sema4_signal(_dispatch_sema4_t *sema, long count)
{
do {
kern_return_t kr = semaphore_signal(*sema);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
} while (--count);
}
semaphore_signal能够唤醒一个在semaphore_wait中等待的线程。如果有多个等待线程,则根据线程优先级来唤醒。
_dispatch_semaphore_dispose
信号量的销毁函数如下:
void
_dispatch_semaphore_dispose(dispatch_object_t dou,
DISPATCH_UNUSED bool *allow_free)
{
dispatch_semaphore_t dsema = dou._dsema;
if (dsema->dsema_value < dsema->dsema_orig) {
DISPATCH_CLIENT_CRASH(dsema->dsema_orig - dsema->dsema_value,
"Semaphore object deallocated while in use");
}
_dispatch_sema4_dispose(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
}
这里有一个判断,若 dsema->dsema_value < dsema->dsema_orig,则导致崩溃,并提示 Semaphore object deallocated while in use。这也是容易遇到的问题之一,后边会讲到。
_dispatch_sema4_dispose代码如下:
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sema4_dispose(_dispatch_sema4_t *sema, int policy)
{
if (_dispatch_sema4_is_created(sema)) {
_dispatch_sema4_dispose_slow(sema, policy);
}
}
进一步往下,会调用_dispatch_sema4_dispose_slow函数,
void
_dispatch_sema4_dispose_slow(_dispatch_sema4_t *sema, int policy)
{
semaphore_t sema_port = *sema;
*sema = MACH_PORT_DEAD;
#if DISPATCH_USE_OS_SEMAPHORE_CACHE
if (policy == _DSEMA4_POLICY_FIFO) {
return os_put_cached_semaphore((os_semaphore_t)sema_port);
}
#endif
kern_return_t kr = semaphore_destroy(mach_task_self(), sema_port);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}
如果使用了信号量缓存,且FIFO,则将待回收的信号量对象放入缓存即可。否则,调用mach内核的semaphore_destroy函数进行信号量的销毁。
一些崩溃
跟GCD group的enter/leave类似,这一类接口要保证操作的平衡。否则可能导致严重的问题。苹果的文档已经说得很清楚了:
Calls to dispatch_semaphore_signal must be balanced with calls to dispatch_semaphore_wait. Attempting to dispose of a semaphore with a count lower than value causes an EXC_BAD_INSTRUCTION exception.
Unbalanced call to dispatch_semaphore_signal()
这一点暂时未能复现。不过看原理,应该就是过度调用dispatch_semaphore_signal导致的。
Semaphore object deallocated while in use
Exception Type: EXC_BREAKPOINT (SIGTRAP)
Exception Codes: 0x0000000000000001, 0x00000001a3b77ff4
Termination Signal: Trace/BPT trap: 5
Termination Reason: Namespace SIGNAL, Code 0x5
Terminating Process: exc handler [249]
Triggered by Thread: 15
Application Specific Information:
BUG IN CLIENT OF LIBDISPATCH: Semaphore object deallocated while in use
Abort Cause 1
Thread 15 name: Dispatch queue: com.xxxx.xxxx.xxxxQueue (QOS: UNSPECIFIED)
Thread 15 Crashed:
0 libdispatch.dylib 0x00000001a3b77ff4 0x1a3b75000 + 12276
1 libdispatch.dylib 0x00000001a3b77014 0x1a3b75000 + 8212
2 libobjc.A.dylib 0x00000001a336e7cc 0x1a336a000 + 18380
3 libobjc.A.dylib 0x00000001a337e6b8 0x1a336a000 + 83640
4 libobjc.A.dylib 0x00000001a337e720 0x1a336a000 + 83744
5 XXXX 0x000000010535056c -[MyXXXXObject dealloc] + 70960492 (MyXXXXObject.mm:xx)
6 XXXX 0x000000010535171c __destroy_helper_block_ea8_32s40s48s56s64r + 70965020 (MyXXXXObject.mm:xxx)
7 libsystem_blocks.dylib 0x00000001a3c30a44 0x1a3c30000 + 2628
8 Foundation 0x00000001a4b09410 0x1a4aea000 + 128016
9 Foundation 0x00000001a4b97330 0x1a4aea000 + 709424
10 libsystem_blocks.dylib 0x00000001a3c30a44 0x1a3c30000 + 2628
11 libdispatch.dylib 0x00000001a3bd57d4 0x1a3b75000 + 395220
12 libdispatch.dylib 0x00000001a3b7a01c 0x1a3b75000 + 20508
13 libdispatch.dylib 0x00000001a3b796e0 0x1a3b75000 + 18144
14 libdispatch.dylib 0x00000001a3b86030 0x1a3b75000 + 69680
15 libdispatch.dylib 0x00000001a3b868d4 0x1a3b75000 + 71892
16 libsystem_pthread.dylib 0x00000001a3db61b4 0x1a3daa000 + 49588
17 libsystem_pthread.dylib 0x00000001a3db8cd4 0x1a3daa000 + 60628
这类crash的原因在于对象释放的时候,其内部持有的信号量对象依然在使用(即dsema->dsema_value < dsema->dsema_orig,信号量的值未通过signal操作恢复到其原始值)。常见于:其他线程执行了wait操作,而在没有对应signal的前提下,即将该信号量释放了(持有信号量的对象释放导致的)。
可以通过下边代码来复现,以查看调用堆栈。
dispatch_semaphore_t sema = dispatch_semaphore_create(1);
dispatch_semaphore_wait(sema, DISPATCH_TIME_FOREVER);
sema = dispatch_semaphore_create(1);
在信号量依然在使用的时候,重新给sema赋值就会导致崩溃,即便是 sema = nil; 也会。堆栈如下:
_dispatch_semaphore_dispose.cold.1
_disaptch_semaphore_dispose
_dispatch_dispose
sema = dispatch_semaphore_create(1);
当然,_disaptch_semaphore_dispose中会调用_dispatch_sema4_dispose_slow,这些我们在之前代码就已经分析过了。
解决办法:如果对于已知场景的信号量,比如信号量初始值为1的情况,可以在对象dealloc的时候手动执行一次signal操作,以免出现 dsema->dsema_value < dsema->dsema_orig 的情况。毕竟多执行了signal操作,目前是不会有问题的。
然而,实际的使用场景还是要尽量保证信号量的wait和signal保持平衡,这样代码逻辑才不会有问题。