多线程

29 Nov 2019

demo

概念

单核多线程与多核多线程

iOS 中常见多线程方案

GCD

GCD中有2个用来执行任务的函数

GCD源码

GCD的队列

队列不等同于线程,队列用来存储待执行的任务,串行只能一个一个按顺序取,并行可以在前一个任务没有执行完的前提下同时执行下一个任务

容易混淆的术语

有4个术语比较容易混淆:同步、异步、并发、串行

各种队列的执行效果

队列组的使用

可以通过gcd的队列组实现以下功能

// 不使用队列组的实现,比较凌乱
dispatch_async(dispatch_get_global_queue(0, 0), ^{
    dispatch_sync(dispatch_get_global_queue(0, 0), ^{
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            for (int i=0; i < 10; i++)
            {
                NSLog(@"Misson: 1, %@", [NSThread currentThread]);
            }
        });
        
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            for (int i=0; i < 10; i++)
            {
                NSLog(@"Misson: 2, %@", [NSThread currentThread]);
            }
        });
    });
    
    dispatch_async(dispatch_get_main_queue(), ^{
        for (int i=0; i < 10; i++)
        {
            NSLog(@"Misson: 3, %@", [NSThread currentThread]);
        }
     });
});

// 使用队列组
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_queue_create(0,DISPATCH_QUEUE_CONCURRENT);

// 异步并发执行加到group中的block代码
dispatch_group_async(group, queue, ^{
    for (int i=0; i < 10; i++)
    {
        NSLog(@"Misson: 1, %@", [NSThread currentThread]);
    }
});

dispatch_group_async(group, queue, ^{
    for (int i=0; i < 10; i++)
    {
        NSLog(@"Misson: 2, %@", [NSThread currentThread]);
    }
});


// 等group中的所有任务执行完之后通知执行下面的代码,也是async的
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
    for (int i=0; i < 10; i++)
    {
        NSLog(@"Misson: 3, %@", [NSThread currentThread]);
    }
});

多线程的安全隐患

案例

多线程安全隐患分析

解决方案

使用线程同步技术(同步,就是协同步调,按预定的先后次序进行) 常见的线程同步技术是: 加锁

iOS中的线程同步方案

OSSpinLock

os_unfair_lock

pthread_mutex

    // 初始化锁的属性
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    /*
     #define PTHREAD_MUTEX_NORMAL        0
     #define PTHREAD_MUTEX_ERRORCHECK    1
     #define PTHREAD_MUTEX_RECURSIVE        2
     #define PTHREAD_MUTEX_DEFAULT        PTHREAD_MUTEX_NORMAL
     */
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
    
    // 初始化锁
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex, &attr);
    
    // 销毁相关资源
    pthread_mutexattr_destroy(&attr);
    pthread_mutex_destroy(&mutex);

普通的锁如果对同一段代码重复加锁,会陷入锁死状态,比如下面这样

- (void)otherTest1
{
    // 加锁
    pthread_mutex_lock(&_mutex);
    NSLog(@"%s", __func__);
    static int count = 0;
    if (count < 10)
    {
        count++;
        [self otherTest1];
    }
    // 解锁
    pthread_mutex_unlock(&_mutex);
}

递归锁: 同一个线程中可以重复加锁的锁,设置以下mutex的属性即可

// 初始化锁的属性
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
// 递归锁
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
// 初始化锁
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, &attr);

条件锁:某件事发生需要满足一个条件之后才能继续执行,可以使用条件锁
比如卖东西和生产东西,当东西售罄之后,需要等待生产好商品才能继续销售商品。

// 初始化条件, NULL代表默认属性
pthread_cond_init(&_cond, NULL);
// 解锁,并进入条件等待,等收到信号并且可以重新加锁才会继续执行
pthread_cond_wait(&_cond, &_mutex);
// 激活一个等待该条件的线程
pthread_cond_signal(&_cond);
// 激活所有等待该条件的线程
pthread_cond_broadcast(&_cond);
// 销毁资源
pthread_cond_destroy(&_cond);

自旋锁 & 互斥锁

自旋锁是高级锁,一旦发现已经被锁了就会进入while循环,占有cpu资源,直到可以重新上锁 互斥锁是低级锁,一旦发现已经被锁了就会进入睡眠状态,等待可以重新上锁再唤醒 OSSpinLock自旋锁os_unfair_lock,pthread_mutex互斥锁

可以在即将加锁的代码中打断点,查看加锁不成功的汇编来验证 OSSpinLock

上面的代码进入while循环

os_unfair_lock

最后调用callsys进入睡眠等待状态。 pthread_mutex也是类似的情况

NSLock、NSRecursiveLock

NSLock是对mutex普通锁的封装 NSRecursiveLock是对mutex递归锁的封装,API跟NSLock基本一致

@protocol NSLocking
- (void)lock;
- (void)unlock;
@end

@interface NSLock : NSObject <NSLocking> {
- (BOOL)tryLock;
- (BOOL)lockBeforeDate:(NSDate *)limit;

@interface NSRecursiveLock : NSObject <NSLocking> {
- (BOOL)tryLock;
- (BOOL)lockBeforeDate:(NSDate *)limit;

NSCondition

NSCondition是对mutexcond的封装

@interface NSCondition : NSObject <NSLocking> {
- (void)wait;
- (BOOL)waitUntilDate:(NSDate *)limit;
- (void)signal;
- (void)broadcast;

NSConditionLock

NSConditionLock是对NSCondition的进一步封装,可以设置具体的条件值

@interface NSConditionLock : NSObject <NSLocking> {
- (instancetype)initWithCondition:(NSInteger)condition;
@property (readonly) NSInteger condition;
- (void)lockWhenCondition:(NSInteger)condition;
- (BOOL)tryLock;
- (BOOL)tryLockWhenCondition:(NSInteger)condition;
- (void)unlockWithCondition:(NSInteger)condition;
- (BOOL)lockBeforeDate:(NSDate *)limit;
- (BOOL)lockWhenCondition:(NSInteger)condition beforeDate:(NSDate *)limit;

比如下面这样

- (void)otherTest1
{
    [[[NSThread alloc] initWithTarget:self selector:@selector(__three) object:nil] start];
    
    [[[NSThread alloc] initWithTarget:self selector:@selector(__two) object:nil] start];
    
    [[[NSThread alloc] initWithTarget:self selector:@selector(__one) object:nil] start];
}

- (void)__one
{
    [self.conditionLock lockWhenCondition:1];
    sleep(1);
    NSLog(@"%s", __func__);
    [self.conditionLock unlockWithCondition:2];
}

- (void)__two
{
    [self.conditionLock lockWhenCondition:2];
    sleep(1);
    NSLog(@"%s", __func__);
    [self.conditionLock unlockWithCondition:3];
}

- (void)__three
{
    [self.conditionLock lockWhenCondition:3];
    sleep(1);
    NSLog(@"%s", __func__);
    [self.conditionLock unlockWithCondition:4];
}

可以设置线程依赖,不管按什么顺序调用,方法会按照__one,__two,__three的顺序执行。

dispatch_queue

直接使用GCD的串行队列,也是可以实现线程同步的

// 创建串行队列
dispatch_queue_t queue = dispatch_queue_create("theQueue", DISPATCH_QUEUE_SERIAL);
// 把要保证线程安全的任务丢到队列中去执行
dispatch_sync(queue, ^{
    // 任务
});

dispatch_semaphore

semaphore叫做”信号量” 信号量的初始值,可以用来控制线程并发访问的最大数量 信号量的初始值为1,代表同时只允许1条线程访问资源,保证线程同步

// 设置信号量的初始值,用来控制最大并发量,这里最大并发量是5
dispatch_semaphore_t semaphore = dispatch_semaphore_create(5);
// 设置为1, 保证没有只有一个线程可以访问
dispatch_semaphore_t singleSemaphore = dispatch_semaphore_create(1);
// 如果信号量的值<=0, 当前线程就会进入休眠等待(直到信号量的值>0)
// 如果信号量的值>0,就减1,然后往下执行后面的代码
dispatch_semaphore_wait(singleSemaphore, DISPATCH_TIME_FOREVER);
需要保证线程同步的代码 ...
// 让信号量的值加1
dispatch_semaphore_signal(singleSemaphore);

@synchronized

@synchronized是对os_unfair_recursive_lock递归锁的封装(旧的版本是对mutex递归锁的封装)
源码查看:objc4中的objc-sync.mm文件 @synchronized(obj)内部会生成obj对应的递归锁,然后进行加锁、解锁操作

在调用@synchronized处打断点

查看汇编代码,可以看到在同步开始处调用了objc_sync_enter,在结束处调用了 objc_sync_exit

查看objc4源码

// Begin synchronizing on 'obj'. 
// Allocates recursive mutex associated with 'obj' if needed.
// Returns OBJC_SYNC_SUCCESS once lock is acquired.  
int objc_sync_enter(id obj)
{
    int result = OBJC_SYNC_SUCCESS;

    if (obj) {
        SyncData* data = id2data(obj, ACQUIRE);
        assert(data);
        data->mutex.lock();
    } else {
        // @synchronized(nil) does nothing
        if (DebugNilSync) {
            _objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
        }
        objc_sync_nil();
    }

    return result;
}

// End synchronizing on 'obj'. 
// Returns OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
int objc_sync_exit(id obj)
{
    int result = OBJC_SYNC_SUCCESS;
    
    if (obj) {
        SyncData* data = id2data(obj, RELEASE); 
        if (!data) {
            result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
        } else {
            bool okay = data->mutex.tryUnlock();
            if (!okay) {
                result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
            }
        }
    } else {
        // @synchronized(nil) does nothing
    }
	

    return result;
}

typedef struct alignas(CacheLineSize) SyncData {
    struct SyncData* nextData;
    DisguisedPtr<objc_object> object;
    int32_t threadCount;  // number of THREADS using this block
    recursive_mutex_t mutex;
} SyncData;

using recursive_mutex_t = recursive_mutex_tt<LOCKDEBUG>;

template <bool Debug>
class recursive_mutex_tt : nocopy_t {
    os_unfair_recursive_lock mLock;

  public:
    constexpr recursive_mutex_tt() : mLock(OS_UNFAIR_RECURSIVE_LOCK_INIT) {
        lockdebug_remember_recursive_mutex(this);
    }

    constexpr recursive_mutex_tt(const fork_unsafe_lock_t unsafe)
        : mLock(OS_UNFAIR_RECURSIVE_LOCK_INIT)
    { }

    void lock()
    {
        lockdebug_recursive_mutex_lock(this);
        os_unfair_recursive_lock_lock(&mLock);
    }

    void unlock()
    {
        lockdebug_recursive_mutex_unlock(this);

        os_unfair_recursive_lock_unlock(&mLock);
    }

    void forceReset()
    {
        lockdebug_recursive_mutex_unlock(this);

        bzero(&mLock, sizeof(mLock));
        mLock = os_unfair_recursive_lock OS_UNFAIR_RECURSIVE_LOCK_INIT;
    }

    bool tryUnlock()
    {
        if (os_unfair_recursive_lock_tryunlock4objc(&mLock)) {
            lockdebug_recursive_mutex_unlock(this);
            return true;
        }
        return false;
    }

    void assertLocked() {
        lockdebug_recursive_mutex_assert_locked(this);
    }

    void assertUnlocked() {
        lockdebug_recursive_mutex_assert_unlocked(this);
    }
};

一进入objc_sync_enter根据传进去的obj地址生成一个锁,并把它存在一个哈希表中,再用这个锁进行加锁 解锁的时候objc_sync_exit根据原来传进的obj地址查找对应的锁,并移除哈希表中的键值对,再用这个锁来解锁

Atomic

atomic用于保证属性setter、getter的原子性操作,相当于在gettersetter内部加了线程同步的锁
可以参考源码objc4objc-accessors.mm
它并不能保证使用属性的过程是线程安全的

id objc_getProperty(id self, SEL _cmd, ptrdiff_t offset, BOOL atomic) {
    if (offset == 0) {
        return object_getClass(self);
    }

    // Retain release world
    id *slot = (id*) ((char*)self + offset);
    if (!atomic) return *slot;
        
    // Atomic retain release world
    spinlock_t& slotlock = PropertyLocks[slot];
    slotlock.lock();
    id value = objc_retain(*slot);
    slotlock.unlock();
    
    // for performance, we (safely) issue the autorelease OUTSIDE of the spinlock.
    return objc_autoreleaseReturnValue(value);
}


static inline void reallySetProperty(id self, SEL _cmd, id newValue, ptrdiff_t offset, bool atomic, bool copy, bool mutableCopy) __attribute__((always_inline));

static inline void reallySetProperty(id self, SEL _cmd, id newValue, ptrdiff_t offset, bool atomic, bool copy, bool mutableCopy)
{
    if (offset == 0) {
        object_setClass(self, newValue);
        return;
    }

    id oldValue;
    id *slot = (id*) ((char*)self + offset);

    if (copy) {
        newValue = [newValue copyWithZone:nil];
    } else if (mutableCopy) {
        newValue = [newValue mutableCopyWithZone:nil];
    } else {
        if (*slot == newValue) return;
        newValue = objc_retain(newValue);
    }

    if (!atomic) {
        oldValue = *slot;
        *slot = newValue;
    } else {
        spinlock_t& slotlock = PropertyLocks[slot];
        slotlock.lock();
        oldValue = *slot;
        *slot = newValue;        
        slotlock.unlock();
    }

    objc_release(oldValue);
}

注意:由于atomic熟悉会给get和set方法使用不同的锁,所以不能保证线程安全

iOS线程同步方案性能比较

考虑到(1,2)有iOS版本兼容的问题,可以考虑优先使用(3,4)

自旋锁、互斥锁比较

iOS中的读写安全方案

通常涉及到文件IO操作时,为了保证数据安全和效率,一般是”多读单写”,也就是:

iOS的实现方案有

pthread_rwlock

等待锁的线程会进入休眠

// 初始化锁
pthread_rwlock_t lock;
pthread_rwlock_init(&lock, NULL);

// 读-加锁
pthread_rwlock_rdlock(&lock);
// 读-尝试加锁
pthread_rwlock_tryrdlock(&lock);

// 写-加锁
pthread_rwlock_wrlock(&lock);
// 写-尝试加锁
pthread_rwlock_trywrlock(&lock);

// 解锁
pthread_rwlock_unlock(&lock);

// 销毁
pthread_rwlock_destroy(&lock);

dispatch_barrier_async

这个函数传入的并发队列必须是自己通过dispatch_queue_create创建的 如果传入的是一个串行或是一个全局的并发队列,那这个函数便等同于dispatch_async函数的效果

Calls to this function always return immediately after the block has been submitted and never wait for the block to be invoked. When the barrier block reaches the front of a private concurrent queue, it is not executed immediately. Instead, the queue waits until its currently executing blocks finish executing. At that point, the barrier block executes by itself. Any blocks submitted after the barrier block are not executed until the barrier block completes. The queue you specify should be a concurrent queue that you create yourself using the dispatch_queue_create function. If the queue you pass to this function is a serial queue or one of the global concurrent queues, this function behaves like the dispatch_async function.

// 初始化队列
dispatch_queue_t queue = dispatch_queue_create("queue",DISPATCH_QUEUE_CONCURRENT);
// 读
dispatch_async(queue, ^{
    
});
// 写
dispatch_barrier_async(queue, ^{
    
});

dispatch_barrier 和 dispatch_group 的比较

GNUstep

GNUstep是GNU计划的项目之一,它将Cocoa的OC库重新开源实现了一遍

源码地址:http://www.gnustep.org/resources/downloads.php

虽然GNUstep不是苹果官方源码,但还是具有一定的参考价值




reference:
GNUstep objc4