iOS多线程锁之@synchronized原理分析

1,359 阅读7分钟

在iOS多线程开发当中,不可避免会遇到数据同步的问题,其中一种解决方案就是通过加锁来防止两条线程同时操作同一片内存空间。今天我们主要来探索一下一种比较常见的锁@synchronized同步锁。

代码示例

首先我们来看一段简单的代码,可以通过xcode转换为汇编代码来看一下@synchronized到底做了什么。 接下来在xcode中对于objc_sync_enterobjc_sync_exit打下符号断点。 可以看到,@synchronized代码块确实调用了上述两个函数,接下来我们到源码中去一探究竟。

源码分析

objc_sync_enter

// Begin synchronizing on 'obj'. 
// Allocates recursive mutex associated with 'obj' if needed.
// Returns OBJC_SYNC_SUCCESS once lock is acquired.  
int objc_sync_enter(id obj)
{
    int result = OBJC_SYNC_SUCCESS;

    if (obj) {
        SyncData* data = id2data(obj, ACQUIRE);
        ASSERT(data);
        data->mutex.lock();
    } else {
        // @synchronized(nil) does nothing
        if (DebugNilSync) {
            _objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
        }
        objc_sync_nil();
    }

    return result;
}

注释简单翻译如下:

开始obj的同步工作,必要时开辟一个关联着obj的互斥递归锁,当获得锁之后返回OBJC_SYNC_SUCCESS。

通过注释我们就可以得出这样一个结论,@synchronized是一个互斥递归锁。

objc_sync_enter主要逻辑如下:

  1. obj不为空时,获取SyncData* data,取出data->mutex进行加锁
  2. obj为空时,执行obj_sync_nil,通过源码查看其实什么也没有处理。

那么核心肯定就在这个SyncData了: 这个结构很容易让人联想到链表,其中的

接下来看一下获取data的方法id2data

id2data

该方法代码比较多,一共有160多行,大家可以打开一份objc的源码同步观看。通过上图大致分为6个操作,我们一步一步来解析。

操作1

spinlock_t *lockp = &LOCK_FOR_OBJ(object);
SyncData **listp = &LIST_FOR_OBJ(object);

我们来看一下LOCK_FOR_OBJ和LIST_FOR_OBJ这两个宏都干了什么

#define LOCK_FOR_OBJ(obj) sDataLists[obj].lock
#define LIST_FOR_OBJ(obj) sDataLists[obj].data
static StripedMap<SyncList> sDataLists;

简单看一下StripedMap

template<typename T>
class StripedMap {
#if TARGET_OS_IPHONE && !TARGET_OS_SIMULATOR
    enum { StripeCount = 8 };
#else
    enum { StripeCount = 64 };
#endif
	// 内部有一个T类型的value值
    struct PaddedT {
        T value alignas(CacheLineSize);
    };
	// array来存储PaddedT
    PaddedT array[StripeCount];
	// 哈希函数
    static unsigned int indexForPointer(const void *p) {
        uintptr_t addr = reinterpret_cast<uintptr_t>(p);
        return ((addr >> 4) ^ (addr >> 9)) % StripeCount;
    }

 public:
    T& operator[] (const void *p) { 
        return array[indexForPointer(p)].value; 
    }
...

StripedMap其实是一个哈希表,内部是一个容量为8的数组,存储T类型的数据,在当前的情况下,就是SyncList

struct SyncList {
    SyncData *data;
    spinlock_t lock;
    constexpr SyncList() : data(nil), lock(fork_unsafe_lock) { }
};

通过这一步我们就可以知道其实上述两个宏的主要作用就是通过哈希算法得出obj所在的SyncList,进一步取出对应的数组dataspinlock_t锁

总结一下操作1的主要工作是通过哈希算法取出我们需要加锁的obj所对应的SyncList中的数组以及一把spinlock_t锁

操作2

#if SUPPORT_DIRECT_THREAD_KEYS
    // Check per-thread single-entry fast cache for matching object
    // 检查单独线程的快速缓存
    bool fastCacheOccupied = NO;
    // 通过tls来获取SyncData
    SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
    if (data) {
        fastCacheOccupied = YES;
	// 校验取出的data是否和此次的object一致
        if (data->object == object) {
            // Found a match in fast cache.
            uintptr_t lockCount;

            result = data;
            // 获取当前线程中对于object的加锁次数,因为是递归锁,所以存在多次加锁
            lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
            if (result->threadCount <= 0  ||  lockCount <= 0) {
                _objc_fatal("id2data fastcache is buggy");
            }

            switch(why) {
            case ACQUIRE: {
            // ACQUIRE类型表示新增加了一次锁
                lockCount++;
                tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
                break;
            }
            case RELEASE:
            // RELEASE表示此次加锁结束了
                lockCount--;
                tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
                // 0代表当前线程已经没有针对object加锁,此时thread_count需要减一
                if (lockCount == 0) {
                    // remove from fast cache
                    tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
                    // atomic because may collide with concurrent ACQUIRE
                    OSAtomicDecrement32Barrier(&result->threadCount);
                }
                break;
            case CHECK:
                // do nothing
                break;
            }

            return result;
        }
    }
#endif

上述代码中已经有对应的注释,简单概括就是从当前线程的本地存储tls中快速的查找obj对应的SyncData,找到的话根据传入的参数ACQUIRE/RELEASE选择相应的操作:修改lockCount以及threadCount,同时更新tls中的值

操作3

如果tls中没有找到对应的SyncData,会进入操作3。

// Check per-thread cache of already-owned locks for matching object
// 获取缓存
    SyncCache *cache = fetch_cache(NO);
    if (cache) {
        unsigned int i;
        for (i = 0; i < cache->used; i++) {
            SyncCacheItem *item = &cache->list[i];
            if (item->data->object != object) continue;
			// 遍历cache查找object对应的cacheItem
            // Found a match.
            result = item->data;
            if (result->threadCount <= 0  ||  item->lockCount <= 0) {
                _objc_fatal("id2data cache is buggy");
            }
                
            switch(why) {
            case ACQUIRE:
                item->lockCount++;
                break;
            case RELEASE:
                item->lockCount--;
                if (item->lockCount == 0) {
                    // remove from per-thread cache
                    cache->list[i] = cache->list[--cache->used];
                    // atomic because may collide with concurrent ACQUIRE
                    OSAtomicDecrement32Barrier(&result->threadCount);
                }
                break;
            case CHECK:
                // do nothing
                break;
            }

            return result;
        }
    }

顺带看一下SyncCacheItem以及fetch_cache的实现

typedef struct {
    SyncData *data;
    unsigned int lockCount;  // number of times THIS THREAD locked this block
} SyncCacheItem;

static SyncCache *fetch_cache(bool create)
{
    _objc_pthread_data *data;
    
    data = _objc_fetch_pthread_data(create);
    if (!data) return NULL;
	
    if (!data->syncCache) {
        if (!create) {
            return NULL;
        } else {
        	// 默认缓存容量为4
            int count = 4;
            data->syncCache = (SyncCache *)
                calloc(1, sizeof(SyncCache) + count*sizeof(SyncCacheItem));
            data->syncCache->allocated = count;
        }
    }

    // Make sure there's at least one open slot in the list.
    // 扩容
    if (data->syncCache->allocated == data->syncCache->used) {
        data->syncCache->allocated *= 2;
        data->syncCache = (SyncCache *)
            realloc(data->syncCache, sizeof(SyncCache) 
                    + data->syncCache->allocated * sizeof(SyncCacheItem));
    }

    return data->syncCache;
}

可以发现,其实和操作2类似,操作3会从SyncCache中取出对应的SyncData,之后进行个操作2类似的处理。

操作4

cache还没有创建

// 加锁
lockp->lock();

    {
        SyncData* p;
        SyncData* firstUnused = NULL;
        // 在全局的listp链表中查找object对应的SyncData
        for (p = *listp; p != NULL; p = p->nextData) {
            if ( p->object == object ) {
                result = p;
                // atomic because may collide with concurrent RELEASE
                // 找到说明当前线程是第一次对object进行加锁,此时需要threadCount+1
                OSAtomicIncrement32Barrier(&result->threadCount);
                goto done;
            }
            // 如果当前结点的threadCount为0,即当前结点对应的object没有一条线程有加锁操作
            if ( (firstUnused == NULL) && (p->threadCount == 0) )
                firstUnused = p;
        }
    
        // no SyncData currently associated with object
        if ( (why == RELEASE) || (why == CHECK) )
            goto done;
    
        // an unused one was found, use it
        // 如果找到一个没有用的结点,对结点进行重新初始化,重新赋值
        if ( firstUnused != NULL ) {
            result = firstUnused;
            result->object = (objc_object *)object;
            result->threadCount = 1;
            goto done;
        }
    }

这一步总的来说就是如果缓存都还没有创建,那么需要从全局的listp链表中寻找Syncdata,如果这也没找到,但是找到了空结点,就对空结点进行赋值

操作5

// 链表中的结点都被占用,此时只能创建新的结点了
    posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));
    result->object = (objc_object *)object;
    result->threadCount = 1;
    new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
    // 头插法插入到当前的listp链表中
    result->nextData = *listp;
    *listp = result;

紧接着上面的listp查找,如果listp没有空结点,只能创建新的结点,然后头插法插入链表listp中

操作6

done:
	// 此时object对应的syncdata已经创建完毕,并且存储完成,对于多线程已经没有了风险,可以解锁了
    lockp->unlock();
    if (result) {
        ...
#if SUPPORT_DIRECT_THREAD_KEYS
	// 如果快速缓存即tls还没有被占用,存储快速缓存中
        if (!fastCacheOccupied) {
            // Save in fast thread cache
            tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
            tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
        } else 
#endif
	// 否则存储到线程的缓存syncCache中
        {
            // Save in thread cache
            // cache不存在的话需要先创建cache
            if (!cache) cache = fetch_cache(YES);
            cache->list[cache->used].data = result;
            cache->list[cache->used].lockCount = 1;
            cache->used++;
        }
    }
    return result;

id2data总结

至此id2data的源码终于分析完了,总结一下,本质就是一个找object对应的syncData的过程,先从tls即fast cache中找,再从线程的syncCache中找,最后从全局的listp链表中找,都找不到的话只能自己创建,然后存储到对应的位置。

为了便于理解,贴一张SyncData的存储结构图: 从前面的代码可以翻到,在ios系统中,全局的哈希表容量为8。

objc_sync_exit

int objc_sync_exit(id obj)
{
    int result = OBJC_SYNC_SUCCESS;
    
    if (obj) {
        SyncData* data = id2data(obj, RELEASE); 
        if (!data) {
            result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
        } else {
            bool okay = data->mutex.tryUnlock();
            if (!okay) {
                result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
            }
        }
    } else {
        // @synchronized(nil) does nothing
    }
	

    return result;
}

在分析过id2data后,objc_sync_exit就比较简单了,也是找到对应的SyncData,然后进行解锁,就不做过多的分析了。

@synchronized使用注意点

首先我们来看如下代码: 在执行过程中出现了崩溃,崩溃原因是同时有两条线程执行了赋值代码时,导致原有的_testArray旧值被释放了两次,我们做如下改动: 结果还是崩溃了,原因也是因为过度释放造成的,大家可能会有疑问,为什么我明明加了锁还是不行呢?

原因主要出在@synchronized (_testArray)这里,因为是对_testArray加锁,当_testArray发生变化之后,在后续的线程和之前的线程中加锁的对象已经发生了变化,即不同的线程取出的syncData不一样了,因为object已经不一样了,因此锁就失效了,引发了后续的崩溃问题。但是如果我们针对self进行加锁,就可以解决这个问题。