iOS进阶之路 (十七)多线程 - 锁的底层原理和使用

2,935 阅读27分钟

现在操作系统基本都是多任务操作系统,即同时有大量可调度实体在运行。在多任务操作系统中,同时运行的多个任务可能:

  • 都需要访问/使用同一种资源
  • 多个任务之间有依赖关系,某个任务的运行依赖于另一个任务。

同步:是指散步在不同任务之间的若干程序片段,它们的运行必须严格按照规定的某种先后次序。最基本的场景就是:多个线程在运行过程中协同步调,按照预定的先后次序运行。比如A任务的运行依赖于B任务产生的数据。

互斥:是指散步在不同任务之间的若干程序片段,当某个任务运行其中一个程序片段时,其他任务就不能运行它们之间的任一程序片段,直到该任务运行完毕。最基本的场景就是:一个公共资源同一时刻只能被一个进程使用。

我们可以使用锁来解决多线程的同步和互斥问题,基本的锁包括三类:互斥锁 自旋锁 读写锁, 其他的比如条件锁 递归锁 信号量都是上层的封装和实现。

一. 互斥锁

互斥锁是一种用于多线程编程中,防止两条线程同时对同一公共资源(比 如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区而达成。

互斥锁可以分为 递归锁(recursive mutex)非递归锁(non-recursive mutex)。二者唯一的区别是,同一个线程可以多次获取同一个递归锁,不会产生死锁。而如果一个线程多次获取同一个非递归锁,则会产生死锁。

  1. 互斥锁的特点:
  • 原子性:如果一个线程锁定了一个互斥量,没有其他线程在同一时间可以成功锁定这个互斥量;
  • 唯一性:如果一个线程锁定了一个互斥量,在它解除锁定之前,没有其他线程可以锁定这个互斥量;
  • 非繁忙等待:如果一个线程锁定了一个互斥量,第二个线程又试图去锁定这个互斥量,则第二个线程将被挂起(不占用任何cpu资源),直到第一个线程解除对这个互斥量的锁定为止,第二个线程则被唤醒并继续执行,同时锁定这个互斥量。
  1. 互斥锁的工作流程:
  • 在访问共享资源后临界区域前,对互斥锁进行加锁;
  • 对互斥锁进行加锁后,任何其他试图再次对互斥锁加锁的线程将会被阻塞,直到锁被释放。
  • 在访问完成后释放互斥锁导上的锁;
  1. 常用的互斥锁
  • @synchronized
  • NSLock
  • NSRecursive

1.1 pthread_mutex

#include <pthread.h>
#include <time.h>
// 初始化一个互斥锁。
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);

// 对互斥锁上锁,若互斥锁已经上锁,则调用者一直阻塞,直到互斥锁解锁后再上锁。
int pthread_mutex_lock(pthread_mutex_t *mutex);

// 调用该函数时,若互斥锁未加锁,则上锁,返回 0;若互斥锁已加锁,则函数直接返回失败,即 EBUSY。
int pthread_mutex_trylock(pthread_mutex_t *mutex);

// 当线程试图获取一个已加锁的互斥量时,pthread_mutex_timedlock 互斥量
// 允许绑定线程阻塞时间。即非阻塞加锁互斥量。
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict abs_timeout);

// 对指定的互斥锁解锁。
int pthread_mutex_unlock(pthread_mutex_t *mutex);

// 销毁指定的一个互斥锁。互斥锁在使用完毕后,必须要对互斥锁进行销毁,以释放资源。
int pthread_mutex_destroy(pthread_mutex_t *mutex);

对于 pthread_mutex 来说,比较重要的是锁的类型,摘自百度百科:

  • PTHREAD_MUTEX_NORMAL:不提供死锁检测。尝试重新锁定互斥锁会导致死锁。如果某个线程尝试解除锁定的互斥锁不是由该线程锁定或未锁定,则将产生不确定的行为。
  • PTHREAD_MUTEX_ERRORCHECK: 提供错误检查。如果某个线程尝试重新锁定的互斥锁已经由该线程锁定,则将返回错误。如果某个线程尝试解除锁定的互斥锁不是由该线程锁定或者未锁定,则将返回错误。
  • PTHREAD_MUTEX_RECURSIVE:该互斥锁会保留锁定计数这一概念。线程首次成功获取互斥锁时,锁定计数会设置为 1。线程每重新锁定该互斥锁一次,锁定计数就增加 1。线程每解除锁定该互斥锁一次,锁定计数就减小 1。 锁定计数达到 0 时,该互斥锁即可供其他线程获取。如果某个线程尝试解除锁定的互斥锁不是由该线程锁定或者未锁定,则将返回错误。
  • PTHREAD_MUTEX_DEFAULT: 尝试以递归方式锁定该互斥锁将产生不确定的行为。对于不是由调用线程锁定的互斥锁,如果尝试解除对它的锁定,则会产生不确定的行为。如果尝试解除锁定尚未锁定的互斥锁,则会产生不确定的行为。

1.2 @synchronized

一个便捷的创建互斥锁的方式,它做了其他互斥锁所做的所有的事情。

@synchronized(object) 指令使用的 object 为该锁的唯一标识,只有当标识相同时,才满足互斥。如果你在不同的线程中传过去的是一样的标识符,先获得锁的会锁定代码块,另一个线程将被阻塞,如果传递的是不同的标识符,则不会造成线程阻塞。

- (void)synchronized
{
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
        @synchronized(self) {
            sleep(2);
            NSLog(@"线程1");
        }
        NSLog(@"线程1解锁成功");
    });
    
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
        sleep(1);
        @synchronized(self) {
            NSLog(@"线程2");
        }
    });
}
打印:
2020-04-26 17:58:14.534038+0800 lock[3891:797979] 线程1
2020-04-26 17:58:14.534250+0800 lock[3891:797979] 线程1解锁成功
2020-04-26 17:58:14.534255+0800 lock[3891:797981] 线程2

1.2.1 @synchronized 原理

clang -x objective-c -rewrite-objc -isysroot /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneSimulator.platform/Developer/SDKs/iPhoneSimulator.sdk  main.m

@synchronized(obj)clang编译后的伪代码如下:

@try {
    objc_sync_enter(obj);
    // do work
} @finally {
    objc_sync_exit(obj);    
}

进入 objc4-756.2 源码

数据结构

typedef struct SyncData {
    id object;
    recursive_mutex_t mutex;
    struct SyncData* nextData;
    int threadCount;
} SyncData;

typedef struct SyncList {
    SyncData *data;
    spinlock_t lock;
} SyncList;

// Use multiple parallel lists to decrease contention among unrelated objects.
#define COUNT 16
#define HASH(obj) ((((uintptr_t)(obj)) >> 5) & (COUNT - 1))
#define LOCK_FOR_OBJ(obj) sDataLists[HASH(obj)].lock
#define LIST_FOR_OBJ(obj) sDataLists[HASH(obj)].data
static SyncList sDataLists[COUNT];

SyncData 结构体 :

  • 传入的 obj
  • obj 关联的 recursive_mutex_t 锁
  • 指向另一个 SyncData 对象的指针 nextData,所以可以把每个 SyncData 结构体看做是链表中的一个节点。
  • 每个 syncData 对象中的锁会被一些线程使用或等待,threadCount就是此时这些线程的数量。syncData结构体 会被缓存,threadCount= 0 代表这个syncData实例可以被复用.

SyncList 结构体:

  • SyncData 当做是链表中的节点,每个 SyncList 结构体都有个指向 SyncData 节点链表头部的指针,也有一个用于防止多个线程对此列表做并发修改的锁。

sDataLists 结构体数组:

  • 一个 SyncList 结构体数组,大小为16。通过定义的一个哈希算法将传入对象映射到数组上的一个下标。值得注意的是这个哈希算法设计的很巧妙,是将对象指针在内存的地址转化为无符号整型并右移五位,再跟 0xF 做按位与运算,这样结果不会超出数组大小。
  • LOCK_FOR_OBJ(obj) 和 LIST_FOR_OBJ(obj):先是哈希出对象的数组下标,然后取出数组对应元素的 lock 或 data。 LOCK_FOR_OBJ(obj)LIST_FOR_OBJ(obj)

当调用 objc_sync_enter(obj) 时,它用 obj 内存地址的哈希值查找合适的 SyncData,然后将其上锁。

当调用 objc_sync_exit(obj) 时,它查找合适的 SyncData 并将其解锁。

objc_sync_enter

// Begin synchronizing on 'obj'. 
// Allocates recursive mutex associated with 'obj' if needed.
// Returns OBJC_SYNC_SUCCESS once lock is acquired.  
int objc_sync_enter(id obj)
{
    int result = OBJC_SYNC_SUCCESS;

    if (obj) {
        SyncData* data = id2data(obj, ACQUIRE);
        assert(data);
        data->mutex.lock();
    } else {
        // @synchronized(nil) does nothing
        if (DebugNilSync) {
            _objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
        }
        objc_sync_nil();
    }

    return result;
}

BREAKPOINT_FUNCTION(
    void objc_sync_nil(void)
);
  • 如果 obj = nil,@synchronized(nil) does nothing
  • 如果 obj 有值,runtime会为传入的 obj 分配一个 递归锁并存储在哈希表中
  • obj 通过 id2data(obj, ACQUIRE) 封装成 SyncData(obj)
  • 递归锁在被同一线程重复获取时不会产生死锁,所以递归锁配合 @synchronized(nil) 保证被同一线程重复获取时不会产生死锁。不过虽然 nil 不行,但 @synchronized([NSNull null]) 是可以的。

1.2.2 面试题

  1. 问题1: 下面的代码运行会发生什么?
- (void)synchronizedTest
{
    self.testArray = [NSMutableArray array];
    
    for (NSInteger i = 0; i < 200000; i ++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            _testArray = [NSMutableArray array];
        });
    }
}
  • _testArray在不同的线程中不断的 retain release,会存在某个时刻,多个线程同时对_testArray进行release,导致crash。
  1. 问题2:用 @synchronizing 锁住 _testArray,还会crash么?
- (void)synchronizedTest
{
    self.testArray = [NSMutableArray array];
    
    for (NSInteger i = 0; i < 200000; i ++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            @synchronized (_testArray) {
                _testArray = [NSMutableArray array];
            }
        });
    }
    
}
  • 上面我们学习了,@synchronized(nil) = do nothing,依然崩溃

被锁对象为nil时,@synchronized并不尽如人意,怎么才能解决问题呢?使用NSLock。

{
    self.testArray = [NSMutableArray array];
    
    NSLock *lock = [[NSLock alloc] init];
    
    for (NSInteger i = 0; i < 200000; i ++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            [lock lock];
            _testArray = [NSMutableArray array];
            [lock unlock];
        });
    }
    
}

1.3 NSLock

NSLock 底层pthread_mutex_lock 实现的, 属性为 PTHREAD_MUTEX_ERRORCHECK。遵循 NSLocking 协议。

@protocol NSLocking

- (void)lock;   
- (void)unlock;

@end

@interface NSLock : NSObject <NSLocking> {
@private
    void *_priv;
}

- (BOOL)tryLock;
- (BOOL)lockBeforeDate:(NSDate *)limit;

@property (nullable, copy) NSString *name API_AVAILABLE(macos(10.5), ios(2.0), watchos(2.0), tvos(9.0));

@end
  • lock:加锁
  • unlock:解锁
  • tryLock:尝试加锁,如果失败的话返回 NO
  • lockBeforeDate: 在指定Date之前尝试加锁,如果在指定时间之前都不能加锁,则返回NO
- (void)nslock
{
    NSLock *lock = [[NSLock alloc] init];
    //线程1
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
            [lock lock];
            NSLog(@"线程1");
            sleep(2);
            [lock unlock];
            NSLog(@"线程1解锁成功");
    });

    //线程2
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
            sleep(1);//以保证让线程2的代码后执行
            [lock lock];
            NSLog(@"线程2");
            [lock unlock];
    });
}
打印:
2020-04-26 20:27:36.474376+0800 lock[6554:889229] 线程1
2020-04-26 20:27:38.474856+0800 lock[6554:889229] 线程1解锁成功
2020-04-26 20:27:38.474880+0800 lock[6554:889230] 线程2
  • 线程 1 中的 lock 锁上了,所以线程 2 中的 lock 加锁失败,阻塞线程 2,但 2 s 后线程 1 中的 lock 解锁,线程 2 就立即加锁成功,执行线程 2 中的后续代码。

1.4 NSRecursiveLock

  1. NSRecursiveLock 的底层是通过 pthread_mutex_lock 实现的,属性为 PTHREAD_MUTEX_RECURSIVE
  2. NSRecursiveLockNSLock 的区别在于:NSRecursiveLock 可以在 同一个线程 中重复加锁,NSRecursiveLock 会记录上锁和解锁的次数,当二者平衡的时候,才会释放锁,其它线程才可以上锁成功。
@protocol NSLocking

- (void)lock;   
- (void)unlock;

@end

@interface NSRecursiveLock : NSObject <NSLocking> {
@private
    void *_priv;
}

- (BOOL)tryLock;
- (BOOL)lockBeforeDate:(NSDate *)limit;

@property (nullable, copy) NSString *name NS_AVAILABLE(10_5, 2_0);

@end
  1. 应用场景
- (void)recursiveLock
{
    NSRecursiveLock *lock = [[NSRecursiveLock alloc] init];
    
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        
        static void (^testMethod)(int);
        
        testMethod = ^(int value) {
            [lock lock];
            if (value > 0) {
                NSLog(@"current value = %d", value);
                testMethod(value - 1);
            }
            [lock unlock];
        };
        
        testMethod(10);
    });
}
打印:
2020-04-26 21:40:24.390756+0800 lock[6691:924076] current value = 10
2020-04-26 21:40:24.390875+0800 lock[6691:924076] current value = 9
2020-04-26 21:40:24.390956+0800 lock[6691:924076] current value = 8
2020-04-26 21:40:24.391043+0800 lock[6691:924076] current value = 7
2020-04-26 21:40:24.391131+0800 lock[6691:924076] current value = 6
2020-04-26 21:40:24.391211+0800 lock[6691:924076] current value = 5
2020-04-26 21:40:24.391295+0800 lock[6691:924076] current value = 4
2020-04-26 21:40:24.391394+0800 lock[6691:924076] current value = 3
2020-04-26 21:40:24.391477+0800 lock[6691:924076] current value = 2
2020-04-26 21:40:24.391561+0800 lock[6691:924076] current value = 1
  • 上面的示例,如果用 NSLock 的话,lock 先上锁,但未执行解锁的时候,就会进入递归的下一层再次请求上锁,阻塞了该线程,线程被阻塞了,自然后面的解锁代码不会执行,而形成了死锁。而 NSRecursiveLock 递归锁就是为了解决这个问题。

1.5 互斥锁总结

对于 @synchronized NSLock NSRecursiveLock 应用场景的个人拙见,如果有问题请各位大佬指正:

  • 普通的线程安全场景,使用 NSLock 即可
  • 同一线程递归,使用 NSRecursiveLock
  • 多线程递归,更多的关注死锁现象,建议使用 @synchronized (本质是对递归锁的封装,但能够防止一些死锁, 使用时注意被锁对象不能为nil)

例如下面的代码,在 for循环 中不断创建线程,在各自的线程中又不断 递归 ,这种多线程+递归的情况下,使用@synchronized加锁。

- (void)test
{
    for (int i= 0; i<100; i++) {
        
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
           
            static void (^testMethod)(int);
            
            testMethod = ^(int value){
                
                @synchronized (self) {
                    if (value > 0) {
                      NSLog(@"current value = %d",value);
                      testMethod(value - 1);
                    }
                }
                
            };
            testMethod(10);
        });
    }
}

二. 自旋锁

线程反复检查锁变量是否可用。由于线程在这一过程中保持执行, 因此是一种忙等。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。 自旋锁避免了进程上下文的调度开销,因此对于线程只会阻塞很短时间的场合是有效的。

  1. 自旋锁与互斥锁功能一样,唯一不同的就是:
  • 互斥锁阻塞后休眠让出cpu,
  • 自旋锁阻塞后不会让出cpu,会一直忙等((busy-wait)待,直到得到锁。
  1. 应用场景:
  • 在用户态使用的比较少,在内核使用的比较多
  • 锁的持有时间比较短,或者说小于2次上下文切换的时间。
  1. 自旋锁的API和互斥锁相似,把 pthread_mutex_xxx()mutex 换成 spin,如:pthread_spin_init()

  2. 自旋锁目前已不安全,可能会出现优先级翻转问题。假设有三个准备执行的任务A、B、C 和 需要互斥访问的共享资源S,三个任务的优先级依次是 A > B > C;

  • 首先:C处于运行状态,获得CPU正在执行,同时占有了资源S;
  • 其次:A进入就绪状态,因为优先级比C高,所以获得CPU,A转为运行状态;C进入就绪状态;
  • 第三:执行过程中需要使用资源,而这个资源又被等待中的C占有的,于是A进入阻塞状态,C回到运行状态;
  • 第四:此时B进入就绪状态,因为优先级比C高,B获得CPU,进入运行状态;C又回到就绪状态;
  • 第五:如果这时又出现B2,B3等任务,他们的优先级比C高,但比A低,那么就会出现高优先级任务的A不能执行,反而低优先级的B,B2,B3等任务可以执行的奇怪现象,而这就是优先反转。

atomic 底层原理

说到自旋锁,不得不提属性修饰符 atomic。

1. setter方法底层原理 -- reallySetProperty

void objc_setProperty(id self, SEL _cmd, ptrdiff_t offset, id newValue, BOOL atomic, signed char shouldCopy) {
    bool copy = (shouldCopy && shouldCopy != MUTABLE_COPY);
    bool mutableCopy = (shouldCopy == MUTABLE_COPY);
    reallySetProperty(self, _cmd, newValue, offset, atomic, copy, mutableCopy);
}

void objc_setProperty_atomic(id self, SEL _cmd, id newValue, ptrdiff_t offset) {
    reallySetProperty(self, _cmd, newValue, offset, true, false, false);
}

void objc_setProperty_nonatomic(id self, SEL _cmd, id newValue, ptrdiff_t offset) {
    reallySetProperty(self, _cmd, newValue, offset, false, false, false);
}

void objc_setProperty_atomic_copy(id self, SEL _cmd, id newValue, ptrdiff_t offset) {
    reallySetProperty(self, _cmd, newValue, offset, true, true, false);
}

void objc_setProperty_nonatomic_copy(id self, SEL _cmd, id newValue, ptrdiff_t offset) {
    reallySetProperty(self, _cmd, newValue, offset, false, true, false);
}
static inline void reallySetProperty(id self, SEL _cmd, id newValue, ptrdiff_t offset, bool atomic, bool copy, bool mutableCopy) 
{
    if (offset == 0) {
        object_setClass(self, newValue);
        return;
    }

    id oldValue;
    id *slot = (id*) ((char*)self + offset);

    if (copy) {
        newValue = [newValue copyWithZone:nil];
    } else if (mutableCopy) {
        newValue = [newValue mutableCopyWithZone:nil];
    } else {
        if (*slot == newValue) return;
        newValue = objc_retain(newValue);
    }

    if (!atomic) {
        // 直接替换
        oldValue = *slot;
        *slot = newValue;
    } else {
        // 加锁替换
        spinlock_t& slotlock = PropertyLocks[slot];
        slotlock.lock();
        oldValue = *slot;
        *slot = newValue;        
        slotlock.unlock();
    }

    objc_release(oldValue);
}
  • 如果属性是非原子属性的:直接 newValue 替换 oldValue
  • 如果属性是原子属性的:创建一个 spinlock_t 类型的锁,并给锁加盐。在锁环境下 newValue 替换 oldValue

2. getter方法底层原理 -- objc_getProperty

id objc_getProperty(id self, SEL _cmd, ptrdiff_t offset, BOOL atomic) {
    if (offset == 0) {
        return object_getClass(self);
    }

    // Retain release world
    id *slot = (id*) ((char*)self + offset);
    if (!atomic) return *slot;
        
    // Atomic retain release world
    spinlock_t& slotlock = PropertyLocks[slot];
    slotlock.lock();
    id value = objc_retain(*slot);
    slotlock.unlock();
    
    // for performance, we (safely) issue the autorelease OUTSIDE of the spinlock.
    return objc_autoreleaseReturnValue(value);
}
  • 如果是非原子属性的,直接返回盐地址下的值
  • 如果是原子属性的,在锁环境下取值

3. spinlock_t& slotlock = PropertyLocks[slot] 到底是什么类型的锁?

spinlock_t 看名字很像自旋锁,但是自旋锁已经不安全了。来看下 spinlock_t 的定义

using spinlock_t = mutex_tt<DEBUG>;
using mutex_locker_t = mutex_tt<LOCKDEBUG>::locker;

看来,苹果在底层使用 mutex_locker_t 替换了 spinlock_tmutex_locker_t 又是什么?

/*!
 * @typedef os_unfair_lock
 *
 * @abstract
 * Low-level lock that allows waiters to block efficiently on contention.
 *
 * In general, higher level synchronization primitives such as those provided by
 * the pthread or dispatch subsystems should be preferred.
 *
 * The values stored in the lock should be considered opaque and implementation
 * defined, they contain thread ownership information that the system may use
 * to attempt to resolve priority inversions.
 *
 * This lock must be unlocked from the same thread that locked it, attemps to
 * unlock from a different thread will cause an assertion aborting the process.
 *
 * This lock must not be accessed from multiple processes or threads via shared
 * or multiply-mapped memory, the lock implementation relies on the address of
 * the lock value and owning process.
 *
 * Must be initialized with OS_UNFAIR_LOCK_INIT
 *
 * @discussion
 * Replacement for the deprecated OSSpinLock. Does not spin on contention but
 * waits in the kernel to be woken up by an unlock.
 *
 * As with OSSpinLock there is no attempt at fairness or lock ordering, e.g. an
 * unlocker can potentially immediately reacquire the lock before a woken up
 * waiter gets an opportunity to attempt to acquire the lock. This may be
 * advantageous for performance reasons, but also makes starvation of waiters a
 * possibility.
 */
OS_UNFAIR_LOCK_AVAILABILITY
typedef struct os_unfair_lock_s {
    uint32_t _os_unfair_lock_opaque;
} os_unfair_lock, *os_unfair_lock_t;

还是要赞叹下苹果官方注释,太详细了。

  • os_unfair_lock 是一种低级锁,必须在 OS_UNFAIR_LOCK_INIT 下初始化。
  • 一般来说,应该首选更高级别的同步工具,如 pthread 或 dispatch 子系统提供的同步工具。
  • 锁里面包含线程所有权信息,用来解决优先级反转问题
  • 该锁必须从锁定它的 同一线程 解除锁定,尝试从其他线程解除锁定将导致断言中止进程。
  • 不能通过共享或多重映射内存从 多个进程或线程 访问此锁,锁的实现依赖于锁值和所属进程的地址。
  • 用来代替废弃的 OSSpinLock(iOS 10废弃)。
  • 出于性能的考虑,解锁器可能会在醒来之前立即重新获取锁。

4. atomic 一定是线程安全的么?

atomic 会对属性的 setter方法 、getter方法 分别加锁,生成了原子性的 setter、getter。这里的原子性也就意味着:假设当前有两个线程,线程A执行 getter 方法的时候,线程B如果想要执行 setter 方法,必须要等到getter方法执行完毕之后才能执行。

简而言之,atomic只能保证代码进入 getter 或者 setter 函数内部是安全的,一旦出现了同时getter 和 setter,多线程只能靠程序员自己保证。所以atomic属性和使用@property的多线程安全没有直接的联系。

举个例子:线程A 和 线程B 都对属性 num 执行10000次 + 1 操作。如果线程安全的话,程序运行结束后,num的值应该是20000。

@property (atomic, assign) NSInteger num;

- (void)atomicTest {
    //Thread A
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        for (int i = 0; i < 10000; i ++) {
            self.num = self.num + 1;
            NSLog(@"%@ -- %ld", [NSThread currentThread], (long)self.num);
        }
    });
    
    //Thread B
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        for (int i = 0; i < 10000; i ++) {
            self.num = self.num + 1;
             NSLog(@"%@ -- %ld", [NSThread currentThread], (long)self.num);
        }
    });
}
打印:
···
2020-04-28 16:35:55.126996+0800 lock[10384:1662304] <NSThread: 0x600000ea3c00>{number = 3, name = (null)} -- 19994
2020-04-28 16:35:55.127083+0800 lock[10384:1662299] <NSThread: 0x600000eecdc0>{number = 5, name = (null)} -- 19995
2020-04-28 16:35:55.127165+0800 lock[10384:1662304] <NSThread: 0x600000ea3c00>{number = 3, name = (null)} -- 19996
2020-04-28 16:35:55.127250+0800 lock[10384:1662299] <NSThread: 0x600000eecdc0>{number = 5, name = (null)} -- 19997
2020-04-28 16:35:55.127341+0800 lock[10384:1662304] <NSThread: 0x600000ea3c00>{number = 3, name = (null)} -- 19998

self.num = self.num + 1 方法:

  • 等号左边 self.num 调用 setter 方法,是原子属性的
  • 等号右边 self.num 调用 getter 方法,是原子属性的
  • 但是 self.num + 1 不是原子属性的啊,还是会出现线程问题。

另外,atomic由于要锁住该属性,因此它会消耗更多的资源,性能会很低,要比 nonatomic 慢20倍。所以iOS移动端开发,我们一般使用nonatomic。但是在mac开发中,atomic就有意义了。

三. 读写锁

读写锁实际是一种特殊的自旋锁,它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。这种锁相对于自旋锁而言,能提高并发性,因为在多处理器系统中,它允许同时有多个读者来访问共享资源,最大可能的读者数为实际的逻辑CPU数。写者是排他性的,一个读写锁同时只能有一个写者或多个读者(与CPU数相关),但不能同时既有读者又有写者。

  1. 读写锁与互斥锁类似,不过读写锁允许更改的并行性,也叫共享互斥锁
  • 互斥锁要么是锁住状态,要么就是不加锁状态,而且一次只有一个线程可以对其加锁。
  • 读写锁可以有3种状态:读模式下加锁状态写模式加锁状态不加锁状态
  1. 读写锁的特点: 多读单写
  • 一次只有一个线程可以占有写模式的读写锁, 但是可以有多个线程同时占有读模式的读写锁. 正是因为这个特性,
  • 当读写锁是写加锁状态时, 在这个锁被解锁之前, 所有试图对这个锁加锁的线程都会被阻塞.
  • 当读写锁在读加锁状态时, 所有试图以读模式对它进行加锁的线程都可以得到访问权, - 但是如果线程希望以写模式对此锁进行加锁, 它必须直到所有的线程释放锁.
  1. 读写锁的API:
#include <pthread.h>
// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr); 

// 申请读锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock ); 

// 申请写锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock ); 

// 尝试以非阻塞的方式来在读写锁上获取写锁。如果有任何的读者或写者持有该锁,则立即失败返回。
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); 

// 解锁
int pthread_rwlock_unlock (pthread_rwlock_t *rwlock); 

// 销毁读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
  1. 应用场景:
  • 读写锁适合于对数据结构的读次数比写次数多得多的情况。
// 用于读写的并发队列:
@property (nonatomic, strong) dispatch_queue_t concurrent_queue;
// 用户数据中心, 可能多个线程需要数据访问:
@property (nonatomic, strong) NSMutableDictionary *dataCenterDic;

- (void)readWriteTest
{
    self.concurrent_queue = dispatch_queue_create("read_write_queue", DISPATCH_QUEUE_CONCURRENT);
    self.dataCenterDic = [NSMutableDictionary dictionary];
    
    dispatch_queue_t queue = dispatch_queue_create("com.akironer", DISPATCH_QUEUE_CONCURRENT);
    
    // 模拟多线程情况下写
    for (NSInteger i = 0; i < 5; i ++) {
        dispatch_async(queue, ^{
            [self ak_setObject:[NSString stringWithFormat:@"akironer--%ld", (long)i] forKey:@"Key"];
        });
    }
    
    // 模拟多线程情况下读
    for (NSInteger i = 0; i < 20; i ++) {
        dispatch_async(queue, ^{
            [self ak_objectForKey:@"Key"];
        });
    }
    
    // 模拟多线程情况下写
    for (NSInteger i = 0; i < 10; i ++) {
        dispatch_async(queue, ^{
            [self ak_setObject:[NSString stringWithFormat:@"iOS--%ld", (long)i] forKey:@"Key"];
        });
    }
}

#pragma mark - 读数据
- (id)ak_objectForKey:(NSString *)key {
    __block id obj;
    // 同步读取数据:
    dispatch_sync(self.concurrent_queue, ^{
        obj = [self.dataCenterDic objectForKey:key];
        NSLog(@"读:%@--%@", obj, [NSThread currentThread]);
        sleep(1);
    });
    return obj;
}

#pragma mark - 写数据
- (void)ak_setObject:(id)obj forKey:(NSString *)key {
    // 异步栅栏调用设置数据: 屏蔽同步
    dispatch_barrier_async(self.concurrent_queue, ^{
        [self.dataCenterDic setObject:obj forKey:key];
        NSLog(@"写:%@--%@", obj, [NSThread currentThread]);
        sleep(1);
    });
}

四. 条件锁

  1. 与互斥锁不同,条件锁是用来等待而不是用来上锁的。条件锁用来自动阻塞一个线程,直 到某特殊情况发生为止。通常条件锁和互斥锁一般同时使用。

  2. 条件锁是利用线程间共享的全局变量进行同步 的一种机制,使我们可以睡眠等待某种条件出现,主要包括两个动作:

  • 一个线程等待 "条件锁的条件成立" 而挂起;
  • 另一个线程使 “条件成立”(给出条件成立信号)。
  1. 条件锁的三要素:
  • 互斥锁:当检测条件时保护数据源,执行条件引发的任务
  • 条件变量:判断条件是否满足的依据
  • 条件探测变量:根据条件决定是否继续运行线程,即线程是否被阻塞

4.1 NSCondition 条件变量

  1. NSCondition 的底层通过 pthread_cond_t 实现的。NSCondition 的对象实际上作为一个锁和一个线程检查器
  • 锁:当检测条件时保护数据源,执行条件引发的任务;
  • 线程检查器:根据条件决定是否继续运行线程,即线程是否被阻塞
  1. NSCondition 实现了 NSLocking协议,当多个线程访问同一段代码时,会以 wait 为分水岭。一个线程等待另一个线程 unlock 之后,再走 wait 之后的代码。
@protocol NSLocking

- (void)lock;   
- (void)unlock;

@end

@interface NSCondition : NSObject <NSLocking> {
@private
    void *_priv;
}

- (void)wait;
- (BOOL)waitUntilDate:(NSDate *)limit;
- (void)signal;
- (void)broadcast;

@property (nullable, copy) NSString *name NS_AVAILABLE(10_5, 2_0);

@end
  • lock: 一般用于多线程同时访问、修改同一个数据源,保证在同一 时间内数据源只被访问、修改一次,其他线程的命令需要在lock 外等待,只到 unlock ,才可访问
  • unlock: 解锁
  • wait:让当前线程处于等待状态
  • signal:任意通知一个线程
  • broadcast:通知所有等待的线程
  1. 应用场景:生产者-消费者模式:
  • 生产模式下,商品数量 + 1
  • 消费模式下,商品数量 - 1
  • 如何保证消费模式下商品数量大于零呢?
- (void)testConditon
{
    self.testCondition = [[NSCondition alloc] init];
    //创建生产-消费者
    for (int i = 0; i < 10; i++) {
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
            [self producer]; // 生产
        });
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
            [self consumer]; // 消费
        });
        
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
            [self consumer]; // 消费
        });
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
            [self producer]; // 生产
        });
        
    }
}

- (void)producer
{
    [self.testCondition lock];
    self.ticketCount = self.ticketCount + 1;
    NSLog(@"生产一个 现有 count %zd",self.ticketCount);
    [self.testCondition signal];
    [self.testCondition unlock];
}

- (void)consumer
{
    // 线程安全
    [self.testCondition lock];

    while (self.ticketCount == 0) {
        NSLog(@"等待 count %zd",self.ticketCount);
        // 保证正常流程
        [self.testCondition wait];
    }
    
    //注意消费行为,要在等待条件判断之后
    self.ticketCount -= 1;
    NSLog(@"消费一个 还剩 count %zd ",self.ticketCount);
    [self.testCondition unlock];
}
打印:
2020-04-27 17:46:43.232762+0800 lock[7444:1140032] 生产一个 现有 count 1
2020-04-27 17:46:43.232900+0800 lock[7444:1140032] 生产一个 现有 count 2
2020-04-27 17:46:43.233001+0800 lock[7444:1140032] 消费一个 还剩 count 1 
2020-04-27 17:46:43.233109+0800 lock[7444:1140066] 消费一个 还剩 count 0 
2020-04-27 17:46:43.233209+0800 lock[7444:1140070] 等待 count 0
2020-04-27 17:46:43.233308+0800 lock[7444:1140030] 等待 count 0
2020-04-27 17:46:43.233406+0800 lock[7444:1140057] 等待 count 0
2020-04-27 17:46:43.233508+0800 lock[7444:1140058] 生产一个 现有 count 1
2020-04-27 17:46:43.233611+0800 lock[7444:1140070] 消费一个 还剩 count 0 
2020-04-27 17:46:43.233713+0800 lock[7444:1140059] 等待 count 0
2020-04-27 17:46:43.234100+0800 lock[7444:1140061] 生产一个 现有 count 1
2020-04-27 17:46:43.234343+0800 lock[7444:1140030] 消费一个 还剩 count 0 

4.2 NSConditionLock 条件锁

  1. NSConditionLock 借助 NSCondition 来实现,它的本质就是一个生产者-消费者模型。NSConditionLock 的内部持有一个 NSCondition 对象,以及 _condition_value 属性,在初始化时就会对这个属性进行赋值.

  2. NSConditionLock 实现了 NSLocking协议,一个线程会等待另一个线程 unlock 或者 unlockWithCondition: 之后再走 lock 或者 lockWhenCondition: 之后的代码。

  3. 相比于 NSCondition, NSConditonLock 自带一个条件探测变量,使用更加灵活。

@protocol NSLocking

- (void)lock;   
- (void)unlock;

@end

@interface NSConditionLock : NSObject <NSLocking> {
@private
    void *_priv;
}

- (instancetype)initWithCondition:(NSInteger)condition NS_DESIGNATED_INITIALIZER;

@property (readonly) NSInteger condition;
- (void)lockWhenCondition:(NSInteger)condition;
- (BOOL)tryLock;
- (BOOL)tryLockWhenCondition:(NSInteger)condition;
- (void)unlockWithCondition:(NSInteger)condition;
- (BOOL)lockBeforeDate:(NSDate *)limit;
- (BOOL)lockWhenCondition:(NSInteger)condition beforeDate:(NSDate *)limit;

@property (nullable, copy) NSString *name API_AVAILABLE(macos(10.5), ios(2.0), watchos(2.0), tvos(9.0));

@end
  • lock : 表示 xxx 期待获得锁,如果没有其他线程获得锁(不需要判断内部的 condition条件) 那它能执行此行以下代码,如果已经有其他线程获得锁(可能是条件锁,或者无条件 锁),则等待,直至其他线程解锁
  • condition:内部condition条件。这属性非常重要,外部condition条件内部condition条件 相同才会获取到 lock 对象;反之阻塞当前线程,直到condition相同
  • lockWhenCondition:(NSInteger)conditionA:表示在没有其他线程获得该锁的前提下,该锁 内部condition条件 不等于 条件A,不能获得锁,仍然等待。如果锁 内部condition 等于A条件,则进入代码区,同时设置它获得该锁,其他任何线程都将等待它代码 的完成,直至它解锁。
  • unlockWithCondition:(NSInteger)conditionA: 表示释放锁,同时把 内部condition条件 设置为A条件
  • return = lockWhenCondition:(NSInteger)conditionA beforeDate:(NSDate *)limitA:表示如果被锁定(没获得 锁),并超过 时间A 则不再阻塞线程。但是注意: 返回的值是NO, 它没有改变锁的状态,这个函数的目的在于可以实现两种状态下的处理
- (void)testConditonLock
{
    NSConditionLock *conditionLock = [[NSConditionLock alloc] initWithCondition:2];
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
       [conditionLock lockWhenCondition:1];
       NSLog(@"线程 1");
       [conditionLock unlockWithCondition:0];
    });
    
    dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
       [conditionLock lockWhenCondition:2];
       NSLog(@"线程 2");
       [conditionLock unlockWithCondition:1];
    });
    
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
       [conditionLock lock];
       NSLog(@"线程 3");
       [conditionLock unlock];
    });
}
打印:
2020-04-27 18:00:21.876356+0800 lock[7484:1148383] 线程 3
2020-04-27 18:00:21.876629+0800 lock[7484:1148384] 线程 2
2020-04-27 18:00:21.876751+0800 lock[7484:1148386] 线程 1
  • 线程 1 调用 [NSConditionLock lockWhenCondition:1] ,此时因为不满足当前条件,所 以会进入 waiting 状态,当前进入到 waiting 时,会释放当前的互斥锁。
  • 此时线程 3 调用 [NSConditionLock lock],本质上是调用 [NSConditionLock lockBeforeDate:],这里不需要比对条件值,所以线程 3 会打印
  • 接下来线程 2 执行 [NSConditionLock lockWhenCondition:2],因为满足条件值,所以线程 2 会打印,打印完成后会调用 [NSConditionLock unlockWithCondition:1] 将 value 设置为 1,并发送 boradcast
  • 线程 1 接收到当前的信号,唤醒执行并打印。
  • 自此当前打印为 线程 3->线程 2 -> 线程 1。
  • [NSConditionLock lockWhenCondition:] 会根据传入的 condition 值和 Value 值进 行对比,如果不相等,这里就会阻塞,进入线程池,否则的话就继续代码执行
  • [NSConditionLock unlockWithCondition:] 会先更改当前的 value 值,然后进行广 播,唤醒当前的线程。

五. 信号量

信号量广泛用于进程或线程间的同步和互斥,信号量本质上是一个非负的整数计数器,它被用来控制对公共资源的访问。

#include <semaphore.h>
// 初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);

// 信号量 P 操作(减 1)
int sem_wait(sem_t *sem);

// 以非阻塞的方式来对信号量进行减 1 操作
int sem_trywait(sem_t *sem);

// 信号量 V 操作(加 1)
int sem_post(sem_t *sem);

// 获取信号量的值
int sem_getvalue(sem_t *sem, int *sval);

// 销毁信号量
int sem_destroy(sem_t *sem);

GCD 的 dispatch_semaphore,可以参考iOS进阶之路 (十六)多线程 - GCD

六:总结

在 ibireme 大神的 不再安全的 OSSpinLock中,对各种锁的性能做了测试(加锁后立即解锁,并没有计算竞争时候的时间消耗)

  • OSSpinLock 性能最高,但它已经不再安全。
  • @synchronized 的效率最低,相信学习了本篇文章,@synchronized 不再是加锁的首先。

参考资料

Cooci -- iOS 中的八大锁

bestswifter -- 深入理解iOS开发中的锁

王令天下 -- 关于 @synchronized,这儿比你想知道的还要多