猪行天下之Python基础——9.2 Python多线程与多进程(中)

1,664 阅读24分钟

内容简述:

  • 1、threading模块详解
  • 2、queue模块详解

1、threading模块详解

Python提供的与线程操作相关的模块,网上有很多资料还是用的thread模块,在3.x版本中已经使用threading来替代thread,如果你在python 2.x版本想使用threading的话,可以使用dummy_threading模块


① threading模块提供的可直接调用的函数

  • active_count():获取当前活跃(alive)线程的个数。
  • current_thread():获取当前的线程对象。
  • get_ident():返回当前线程的索引,一个非零的整数(3.3新增)。
  • enumerate():获取当前所有活跃线程的列表。
  • main_thread():返回主线程对象(3.4新增)。
  • settrace(func):设置一个回调函数,在run()执行之前被调用。
  • setprofile(func):设置一个回调函数,在run()执行完毕之后调用。
  • stack_size():返回创建新线程时使用的线程堆栈大小。
  • threading.TIMEOUT_MAX:堵塞线程时间最大值,超过这个值会栈溢出。

② 线程局部变量(Thread-Local Data)

问题引入

在一个进程内所有的线程共享进程的全局变量,线程间共享数据很方便但是每个线程都可以随意修改全局变量,可能会引起线程安全问题。

解决方法

对于这种线程私有数据,最简单的方法就是对变量加锁或使用局部变量,只有线程自身可以访问,其他线程无法访问。除此之外还可以使用threading模块为我们提供的ThreadLocal变量,它本身是一个全局变量,但是线程们却可以使用它来保存私有数据。

用法简介

定义一个全局变量:data = thread.local(),然后就可以往里面存数据啦,比如data.num = xxx,但是有一点要注意:如果data里没有设置对应的属性,直接取会报AttributeError异常,使用时可以捕获这个异常或先调用hasattr(对象,属性)判断对象中是否有该属性!使用代码示例如下:

import threading
import random

data = threading.local()

def show(d):
    try:
        num = d.num
    except AttributeError:
        print("线程 %s 还未设置该属性!" % threading.current_thread().getName())
    else:
        print("线程 %s 中该属性的值为 = %s" % (threading.current_thread().getName(), num))

def thread_call(d):
    show(d)
    d.num = random.randint(1100)
    show(d)

if __name__ == '__main__':
    show(data)
    data.num = 666
    show(data)
    for i in range(2):
        t = threading.Thread(target=thread_call, args=(data,), name="Thread " + str(i))
        t.start()

运行结果如下

线程 MainThread 还未设置该属性!
线程 MainThread 中该属性的值为 = 666
线程 Thread 0 还未设置该属性!
线程 Thread 0 中该属性的值为 = 80
线程 Thread 1 还未设置该属性!
线程 Thread 1 中该属性的值为 = 17

不同线程访问这个ThreadLocal变量,返回的都是不一样的值,原理:

threading.local()实例化一个全局对象,这个全局对象里有一个大字典,键值为两个弱引用对象{线程对象,字典对象},然后可以通过current_thread()获得当前的线程对象,然后根据这个对象可以拿到对应的字典对象,然后进行参数的读或者写。


③ 线程对象(threading.Thread)

创建新线程的两种方式

  • 1.直接创建threading.Thread对象并把调用对象作为参数传入
  • 2.继承threading.Thread类重写run()方法

使用代码示例(验证单线程快还是多线程快):

import threading
import time

def catch_fish():
    pass

def one_thread():
    start_time = time.time()
    for i in range(11001):
        catch_fish()
    end_time = time.time()
    print("单线程测试 耗时 === %s" % str(end_time - start_time))

def muti_thread():
    start_time = time.time()
    for i in range(11001):
        threading.Thread(target=catch_fish()).start()
    end_time = time.time()
    print("多线程测试 耗时 === %s" % str(end_time - start_time))

if __name__ == '__main__':
    # 单线程
    threading.Thread(one_thread()).start()
    # 多线程
    muti_thread()

运行结果如下:

单线程测试 耗时 === 0.00011301040649414062
多线程测试 耗时 === 0.07665514945983887

从输出结果可以看到,多线程反而比单线程要慢,原因是前面介绍过的Python中的全局解释器锁(GIL), 使得任何时候仅有一个线程在执行。

Thread类构造函数

def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None)
:

构造函数参数依次是

  • group:线程组
  • target:要执行的函数
  • name:线程名字
  • args/kwargs:要传入的函数的参数
  • daemon:是否为守护线程

相关属性与函数

  • start():启动线程,只能调用一次
  • run():线程执行的操作,可继承Thread重写,参数可从args和kwargs获取;
  • join([timeout]):堵塞调用线程,直到被调用线程运行结束或超时;如果
    没设置超时时间会一直堵塞到被调用线程结束。
  • name/getName():获得线程名;
  • setName():设置线程名;
  • ident:线程是已经启动,未启动会返回一个非零整数;
  • is_alive():判断是否在运行,启动后,终止前;
  • daemon/isDaemon():线程是否为守护线程;
  • setDaemon():设置线程为守护线程;

④ Lock(指令锁)与RLock(可重入锁)

在概念那里就讲了,多个进程并发的访问临界资源可能会引起线程同步安全问题,写个简单的例子,然后再引入同步锁。代码示例如下:

import threading

file_name = "test.txt"

# 定义一个写入文件的方法
def write_to_file(msg):
    try:
        with open(file_name, "a+", encoding="utf-8"as f:
            f.write(msg + "\n")
    except OSError as reason:
        print(str(reason))

class MyThread(threading.Thread):
    def __init__(self, msg):
        super().__init__()
        self.msg = msg
    def run(self):
        write_to_file(self.name + "~" + self.msg)

if __name__ == '__main__':
    for i in range(121):
        t = MyThread(str(i)).start()

运行结果如下

# test.txt文件内容
Thread-1~1
Thread-5~5
Thread-3~3
Thread-2~2
Thread-4~4
Thread-6~6
Thread-7~7
Thread-8~8
Thread-10~10
Thread-9~9
Thread-11~11
Thread-13~13
Thread-12~12
Thread-14~14
Thread-15~15
Thread-16~16
Thread-17~17
Thread-19~19
Thread-20~20
Thread-18~18

发现结果并没有按照我们预想的1-20那样顺序打印,而是乱的,threading模块中提供了两个类来确保多线程共享资源的访问:「Lock」 和 「RLock」。

Lock指令锁,有两种状态(锁定与非锁定),以及两个基本函数:

使用acquire()设置为locked状态,使用release()设置为unlocked状态。acquire()函数有两个可选参数:blocking=True:是否堵塞当前线程等待;timeout=None:堵塞等待时间。如果成功获得lock,acquire返回True,否则返回False,超时也是返回False。使用起来也很简单,在访问共享资源的地方acquire一下,用完release就好。使用代码示例如下:

import threading

file_name = "test.txt"
lock = threading.Lock()

# 定义一个写入文件的方法(加锁)
def write_to_file(msg):
    if lock.acquire():
        try:
            with open(file_name, "a+", encoding="utf-8"as f:
                f.write(msg + "\n")
        except OSError as reason:
            print(str(reason))
        finally:
            lock.release()

class MyThread(threading.Thread):
    def __init__(self, msg):
        super().__init__()
        self.msg = msg
    def run(self):
        write_to_file(self.name + "~" + self.msg)

if __name__ == '__main__':
    for i in range(1101):
        t = MyThread(str(i)).start()

这里把循环次数改成了101,反复执行多次,test.txt中写入顺序也是正确的,加锁有效。另外有一点要注意:如果锁的状态是unlocked,此时调用release会抛出RuntimeError异常!

RLock可重入锁,和Lock类似,但RLock却可以被同一个线程请求多次! 举个例子:在一个线程里调用Lock对象的acquire方法两次。

lock.acquire()
lock.acquire()
lock.release()
lock.release()

你会发现程序卡住不动,因为已经发生了死锁,但是方法调用是在同一个线程里的,这很不合理吧。这个时候就可以引入RLock了,使用RLock编写一样代码,只需把threading.Lock()改成threading.RLock(),即可解决这个问题。

虽然使用RLock可以规避同一个线程引起的死锁问题,但是acquire和release函数要成对出现,即有多少个acquire就要有多少个release,才能够正真释放锁


⑤ 条件变量(Condition)

上面的互斥锁Lock和RLock只是最简单的同步机制,Python为我们提供了Condition(条件变量),以便于处理复杂线程同步问题,比如最经典的生产者与消费者问题。Condition除了提供与Lock类似的acquire()与release()函数外,还提供了wait()与notify()函数。

用法简介

  • 1.调用`threading.Condition`获得一个条件变量对象;
  • 2.线程调用acquire获得Condition对象
  • 3.进行条件判断,不满足条件调用wait函数,满足条件,进行一些处理改变条件后,调用notify函数通知处于wait状态的线程,重新进行条件判断。

代码示例如下(实现一个简单的消费者和生产者):

import threading
import time

condition = threading.Condition()
products = 0  # 商品数量

# 定义生产者线程类
class Producer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products >= 99:
                    condition.wait()
                else:
                    products += 2
                    print(self.name + "生产了2个产品,当前剩余产品数为:" + str(products))
                    condition.notify()
                condition.release()
                time.sleep(2)

# 定义消费者线程类
class Consumer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products < 4:
                    condition.wait()
                else:
                    products -= 4
                    print(self.name + "消耗了4个产品,当前剩余产品数为:" + str(products))
                    condition.notify()
            condition.release()
            time.sleep(2)

if __name__ == '__main__':
    # 创建五个生产者线程
    for i in range(5):
        p = Producer()
        p.start()
    # 创建两个消费者线程
    for j in range(2):
        c = Consumer()
        c.start()

部分运行结果如下

Thread-1生产了2个产品,当前剩余产品数为:2
Thread-2生产了2个产品,当前剩余产品数为:4
Thread-3生产了2个产品,当前剩余产品数为:6
Thread-4生产了2个产品,当前剩余产品数为:8
Thread-5生产了2个产品,当前剩余产品数为:10
Thread-6消耗了4个产品,当前剩余产品数为:6
Thread-7消耗了4个产品,当前剩余产品数为:2
Thread-1生产了2个产品,当前剩余产品数为:4
Thread-5生产了2个产品,当前剩余产品数为:6
Thread-3生产了2个产品,当前剩余产品数为:8
Thread-7消耗了4个产品,当前剩余产品数为:4
Thread-6消耗了4个产品,当前剩余产品数为:0
Thread-4生产了2个产品,当前剩余产品数为:2

Condition维护着一个互斥锁对象(默认是RLock),也可以自己实例化一个在Condition实例化的时候通过构造函数传入,所以,调用的Condition的acquire与release函数,其实调用就是这个锁对象的acquire与release函数。

Condition提供的其他函数

  • wait(timeout=None):释放锁,同时线程被挂起,直到收到通知被唤醒
    或超时(如果设置了timeout),当线程被唤醒并重新占有锁时,程序才继续执行;
  • wait_for(predicate, timeout=None):等待知道条件为True,predicate应该是
    一个回调函数,返回布尔值,timeout用于指定超时时间,返回值为回调函数返回
    的布尔值,或者超时,返回False(3.2新增);
  • notify(n=1):默认唤醒一个正在的等待线程,notify并不释放锁!!!
  • notify_all():唤醒所有等待线程,进入就绪状态,等待获得锁,notify_all 同样不释放锁!!!

注:上述函数只有在acquire之后才能调用,不然会报RuntimeError异常。


⑥ 信号量(Semaphore)

信号量,也是一个简单易懂的东西,举个形象的例子:

厕所里有五个坑位,每有个人去厕所就会占用一个坑位,所剩余的坑位-1,当五个坑都被人占满时,新来的人就只能在外面等候,直到有人出来为止。这里的五个坑位就是信号量,蹲坑的人就是线程,初始值为5,来人-1,走人+1,超过最大值,新来的处于堵塞状态,我们写下代码来还原这个过程。

信号量使用代码示例如下

import threading
import time
import random

s = threading.Semaphore(5)  # 粪坑

class Human(threading.Thread):
    def run(self):
        s.acquire()  # 占坑
        print("蹲坑 - " + self.name + " - " + str(time.ctime()))
        time.sleep(random.randrange(13))
        print("走人 - " + self.name + " - " + str(time.ctime()))
        s.release()  # 走人

if __name__ == '__main__':
    for i in range(10):
        human = Human()
        human.start()

运行结果如下

蹲坑 - Thread-1 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-2 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-3 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-4 - Tue Jul 17 19:59:15 2018
蹲坑 - Thread-5 - Tue Jul 17 19:59:15 2018
走人 - Thread-1 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-6 - Tue Jul 17 19:59:16 2018
走人 - Thread-2 - Tue Jul 17 19:59:16 2018
走人 - Thread-3 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-8 - Tue Jul 17 19:59:16 2018
走人 - Thread-5 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-7 - Tue Jul 17 19:59:16 2018
蹲坑 - Thread-9 - Tue Jul 17 19:59:16 2018
走人 - Thread-4 - Tue Jul 17 19:59:17 2018
蹲坑 - Thread-10 - Tue Jul 17 19:59:17 2018
走人 - Thread-6 - Tue Jul 17 19:59:17 2018
走人 - Thread-8 - Tue Jul 17 19:59:17 2018
走人 - Thread-9 - Tue Jul 17 19:59:17 2018
走人 - Thread-7 - Tue Jul 17 19:59:18 2018
走人 - Thread-10 - Tue Jul 17 19:59:19 2018

⑦ 通用的条件变量(Event)

Python提供的「用于线程间通信的信号标志」,一个线程标识了一个事件,其他线程处于等待状态,直到事件发生后,所有线程都会被激活。Event对象属性实现了简单的线程通信机制,提供了设置信号,清除信号,等待等用于实现线程间的通信。提供以下四个可供调用的方法:

  • is_set():判断内部标志是否为真
  • set():设置信号标志为真
  • clear():清除Event对象内部的信号标志(设置为false)
  • wait(timeout=None):使线程一直处于堵塞,知道标识符变为True

使用代码示例(汽车等红绿灯的例子):

import threading
import time
import random

class CarThread(threading.Thread):
    def __init__(self, event):
        threading.Thread.__init__(self)
        self.threadEvent = event
    def run(self):
        # 休眠模拟汽车先后到达路口时间
        time.sleep(random.randrange(110))
        print("汽车 - " + self.name + " - 到达路口...")
        self.threadEvent.wait()
        print("汽车 - " + self.name + " - 通过路口...")

if __name__ == '__main__':
    light_event = threading.Event()
    # 假设有20台车子
    for i in range(20):
        car = CarThread(event=light_event)
        car.start()
    while threading.active_count() > 1:
        light_event.clear()
        print("红灯等待...")
        time.sleep(3)
        print("绿灯通行...")
        light_event.set()
        time.sleep(2)

运行结果如下

红灯等待...
汽车 - Thread-10 - 到达路口...
汽车 - Thread-14 - 到达路口...
汽车 - Thread-9 - 到达路口...
汽车 - Thread-11 - 到达路口...
汽车 - Thread-12 - 到达路口...
绿灯通行...
汽车 - Thread-11 - 通过路口...
汽车 - Thread-10 - 通过路口...
汽车 - Thread-9 - 通过路口...
汽车 - Thread-14 - 通过路口...
汽车 - Thread-12 - 通过路口...
汽车 - Thread-6 - 到达路口...
汽车 - Thread-6 - 通过路口...

⑧ 定时器(Timer)

和Thread类似,只是要等待一段时间后才会开始运行,单位秒,用法也很简单,
代码示例如下

import threading
import time

def skill_ready():
    print("菜肴制作完成!!!")

if __name__ == '__main__':
    t = threading.Timer(5, skill_ready)
    t.start()
    while threading.active_count() > 1:
        print("======菜肴制作中======")
        time.sleep(1)

运行结果如下

======菜肴制作中======
======菜肴制作中======
======菜肴制作中======
======菜肴制作中======
======菜肴制作中======
菜肴制作完成!!!

⑨ 栅栏(Barrier)

Barrier直译栅栏,感觉不怎么好理解,我们可以把它看做是赛马用的栅栏,然后马(线程)依次来到栅栏前等待(wait),直到所有的马都停在栅栏面前了,然后所有马开始同时出发(start)。简单点说就是: 多个线程间的相互等待,调用了wait()方法的线程进入堵塞, 直到所有的线程都调用了wait()方法,然后所有线程同时进入就绪状态, 等待调度运行。

构造函数Barrier(parties,action=None,timeout=None)

参数解释

  • parties:创建一个可容纳parties条线程的栅栏;
  • action:全部线程被释放时可被其中一条线程调用的可调用对象;
  • timeout:线程调用wait()方法时没有显式设定timeout,就用的这个作为默认值;

相关属性与函数

  • wait(timeout=None):表示线程就位,返回值是一个0到parties-1之间的
    整数, 每条线程都不一样,这个值可以用作挑选一条线程做些清扫工作,另外如果
    你在构造函数里设置了action的话,其中一个线程在释放之前将会调用它。如果调用
    出错的话,会让栅栏进入broken状态,超时同样也会进入broken状态,如果栅栏在
    处于broke状态的时候调用reset函数,会抛出一个BrokenBarrierError异常。
  • reset():本方法将栅栏置为初始状态,即empty状态。所有已经在等待的线程都会
    接收到BrokenBarrierError异常,注意当有其他处于unknown状态的线程时,调用
    此方法将可能获取到额外的访问。因此如果一个栅栏进入了broken状态, 最好是
    放弃他并新建一个栅栏,而不是调用reset方法。
  • abort():将栅栏置为broken状态。本方法将使所有正在等待或将要调用
    wait()方法的线程收到BrokenBarrierError异常。本方法的使用情景为,比如:
    有一条线程需要abort(),又不想给其他线程造成死锁的状态,或许设定
    timeout参数要比使用本方法更可靠。
  • parites:将要使用本 barrier 的线程的数量
  • n_waiting:正在等待本 barrier 的线程的数量
  • broken:栅栏是否为broken状态,返回一个布尔值
  • BrokenBarrierError:RuntimeError的子类,当栅栏被reset()或broken时引发;

使用代码示例如下(公司一起去旅游等人齐才出发):

import threading
import time
import random

class Staff(threading.Thread):
    def __init__(self, barriers):
        threading.Thread.__init__(self)
        self.barriers = barriers

    def run(self):
        print("员工 【" + self.name + "】" + "出门")
        time.sleep(random.randrange(110))
        print("员工 【" + self.name + "】" + "已签到")
        self.barriers.wait()

def ready():
    print(threading.current_thread().name + ":人齐,出发,出发~~~")

if __name__ == '__main__':
    print("要出去旅游啦,大家快集合~")
    b = threading.Barrier(10, action=ready, timeout=20)
    for i in range(10):
        staff = Staff(b)
        staff.start()

运行结果如下

要出去旅游啦,大家快集合~
员工 【Thread-1】出门
员工 【Thread-2】出门
员工 【Thread-3】出门
员工 【Thread-4】出门
员工 【Thread-5】出门
员工 【Thread-6】出门
员工 【Thread-7】出门
员工 【Thread-8】出门
员工 【Thread-9】出门
员工 【Thread-10】出门
员工 【Thread-8】已签到
员工 【Thread-4】已签到
员工 【Thread-5】已签到
员工 【Thread-6】已签到
员工 【Thread-9】已签到
员工 【Thread-2】已签到
员工 【Thread-3】已签到
员工 【Thread-7】已签到
员工 【Thread-1】已签到
员工 【Thread-10】已签到
Thread-10:人齐,出发,出发~~~

2、queue模块详解

Python中的queue模块中已经实现了一个线程安全的多生产者,多消费者队列,自带锁,常用于多线程并发数据交换。内置三种类型的队列:

  • Queue:FIFO(先进先出);
  • LifoQueue:LIFO(后进先出);
  • PriorityQueue:优先级最小的先出;

三种类型的队列的构造函数都是(maxsize=0),用于设置队列容量,如果设置的maxsize小于1,则表示队列的长度无限长。

两个异常

  • Queue.Empty:当调用非堵塞的get()获取空队列元素时会引发;
  • Queue.Full:当调用非堵塞的put()满队列里添加元素时会引发;

相关函数

  • size():返回队列的近似大小,注意:qsize()> 0不保证随后的get()不会 阻塞也不保证qsize() < maxsize后的put()不会堵塞;
  • empty():判断队列是否为空,返回布尔值,如果返回True,不保证后续 调用put()不会阻塞,同理,返回False也不保证get()调用不会被阻塞;
  • full():判断队列是否满,返回布尔值如果返回True,不保证后续 调用get()不会阻塞,同理,返回False也不保证put()调用不会被阻塞;
  • put(item, block=True, timeout=None):往队列中放入元素,如果block为True且timeout参数为None(默认),为堵塞型put(),如果timeout是 正数,会堵塞timeout时间并引发Queue.Full异常,如果block为False则 为非堵塞put()。
  • put_nowait(item):等价于put(item, False),非堵塞put()
  • get(block=True, timeout=None):移除一个队列元素,并返回该元素,如果block为True表示堵塞函数,block = False为非堵塞函数,如果设置了timeout,堵塞时最多堵塞超过多少秒,如果这段时间内没有可用的项,会引发Queue.Empty异常,如果为非堵塞状态,有数据可用返回数据无数据立即抛出Queue.Empty异常;
  • get_nowait():等价于get(False),非堵塞get()
  • task_done():完成一项工作后,调用该方法向队列发送一个完成信号,任务-1;
  • join():等队列为空,再执行别的操作;

使用代码示例如下

import threading
import queue
import time
import random

work_queue = queue.Queue()

# 任务模拟
def working():
    global work_queue
    while not work_queue.empty():
        data = work_queue.get()
        time.sleep(random.randrange(12))
        print("执行" + data)
        work_queue.task_done()

# 工作线程
class WorkThread(threading.Thread):
    def __init__(self, t_name, func):
        self.func = func
        threading.Thread.__init__(self, name=t_name)
    def run(self):
        self.func()

if __name__ == '__main__':
    work_list = []
    for i in range(121):
        work_list.append("任务 %d" % i)
    # 模拟把需要执行的任务放到队列中
    for i in work_list:
        work_queue.put(i)
    # 初始化一个线程列表
    threads = []
    for i in range(0, len(work_list)):
        t = WorkThread(t_name="线程" + str(i), func=working)
        t.daemon = True
        t.start()
        threads.append(t)
    work_queue.join()
    for t in threads:
        t.join()
    print("所有任务执行完毕")

运行结果如下

执行任务 1
执行任务 3
执行任务 5
执行任务 2
执行任务 4
执行任务 6
执行任务 8
执行任务 10
执行任务 13
执行任务 11
执行任务 17
执行任务 18
执行任务 19
执行任务 7
执行任务 14
执行任务 16
执行任务 9
执行任务 15
执行任务 12
执行任务 20
所有任务执行完毕

如果本文对你有所帮助,欢迎
留言,点赞,转发
素质三连,谢谢😘~