python 中多进程以及多线程编程的总结

3,842 阅读11分钟
原文链接: sunms.codefly.top

这里总结一下,最近学习的python中的多进程以及多线程编程的相关知识。介绍python中的多进程以及多线程编程之前,首先,要先搞清楚什么是进程,什么是线程。

进程是一个具有独立功能的程序关于某个数据集合的一次运行活动。它可以申请和拥有系统资源,是一个动态的概念,是一个活动的实体。主要注意的是它不只是程序的代码。

线程是进程中的一个实体,是被系统独立调度和分派的基本单位。也就是说线程是进程的一部分。

上面的解释很生硬,我用白话解释一下:进程就是执行中的程序,能够单独存在。线程是进程的一部分,依赖与进程所存在的,不能单独存在。我们平常所说的计算机的多任务的处理,既可以依靠多进程的方式也可以依靠多线程的方式来实现。

了解了进程以及线程的基本概念以后,我们在了解一下什么是多进程编程以及什么是多线程编程。

 多进程编程:在编程中为了解决一个需求,在程序中同时启动多个进程进行数据的处理的编程方式,叫做多进程编程。

多线程编程:在一个进程中,采用同时执行多个线程的方式进行数据的处理的编程方式,叫做多线程编程。

python是同时支持多进程以及多线程编程的。但是python中的多线程由于python解释器本身的GIL锁的存在,并不能发挥出全部的”实力”,所以很多人说python中的多线程很鸡肋,但是其并不是完全一无是处的,讲到多线程的时候,再做解释。

python中的多进程:

接触过Unix/Linux环境下C语言开发的同学们都知道,在Unix/Linux中系统内核为我们创建进程提供了fork()函数,这个函数比较特殊,特殊在于它具有两个返回值,分别返回0以及创建的子进程的p_id(进程编号),我们可以根据其具体返回的值,来判断当前进程是子进程还是父进程。下面贴一段c代码,方便大家理解。

#include   
#include    
int main (){   
    //pid表示fork函数返回的值 
    int pid; 
    int count=0;  
    //创建子进程,如果创建成功,就返回两个值,一个值为0,一个值为创建的子进程的p_id(>0)
    //如果创建子进程失败,就返回负数
    pid=fork();     
    if (fpid < 0)   
        printf("进程创建失败\n");   
    else if (fpid == 0) {  
        printf("i am the child process, my process id is %d/n",getpid());   
        printf("我是子进程\n");  
        count++;  
    }  
    else {  
        printf("i am the parent process, my process id is %d/n",getpid());   
        printf("我是父进程/n");  
        count++;  
    }  
    printf("统计结果是: %d/n",count);  
    return 0;  
} 

上面的代码是一段标准的创建进程的”套路”。在将具体的python多进程编程之前,还有一点必须得说清楚,就是子进程以及父进程的关系。创建子进程的时候,操作系统会将父进程中的内存资源为子进程拷贝一份,也就说子进程创建之处二者之间的资源是完全相同的,虽然相同但是分属与不同的内存的区间,数据之间也就没有任何的管联性了。好了,说清楚了这一点,就直接来看python中的多进程编程的具体实现吧。

python为我们提供了multiprocessing这个跨平台的模块来帮助我们实现多进程代码的编写。下面贴出一段经典的套路步骤:

from multiprocessing import Process
import os

# 以函数的形式定义子进程需要执行的具体的代码
def run_proc(name):
    #os模块的getpid()方法可以获取当前进程的进程id
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    #创建一个进程实例
    p = Process(target=run_proc, args=('test',))
    #调用strat()方法,开始执行子进程
    p.start()
    #调用进程的join()方法,来阻塞除当前进程以外的所有进程
    #当该进程执行完毕以后,再执行其他进程(这里指的是主进程)
    p.join()
    #子进程执行完毕,父进程继续往下执行
    print('Child process end.')

除了上面代码提出的进程相关的方法以外,python中多进程编程中还有一个属性经常提到,那就是daemon属性。这个属性是用来设置是否将当前进程设置为守护进程(也叫后台进程)。如果子进程设置了该属性为True时,当主进程执行完成以后,不管子进程时候还在执行,都会强制终止子进程的执行。

上面的代码中,我们只是创建了一个进程,如果需要创建多个进程,怎们办那?这里有两种解决方案,一种是按照上面的方式依次创建多个进程,而是采用进程池来进行创建。我们先说依次创建的方式:

pass
p1 = Process(target="函数名",args=("函数的参数",))
p2 = Process(target="函数名",args=("函数的参数",))
p1.start()
p2.start()
p1.join()
p2.join()

当然了,在实际情况下,我们不会按照上面的方式去创建,这样就太笨了,而是通过循环进行创建。不过上面的例子也肯定没有问题,这样写是因为有很多人在创建进程的时候,会进入误区,他们会这样写:

pass
p1 = Process(target="函数名",args=("函数的参数",))
p2 = Process(target="函数名",args=("函数的参数",))
p1.start()
p1.join()
p2.start()
p2.join()

上面的写法其实是错误的,很多新手会犯这么错,具体错在那里那,我们来分析一下:

  当程序执行到p1.join()的时候,当前有两个进程(主进程/子进程p1)
p1.join()会阻塞掉主进程,此时只会执行进程p1
当进程p1执行完毕,对于主进程的阻塞取消,当前执行进程只有主进程
当p2.start()执行完毕,当前进程有主进程和子进程p2
当p2.join()执行完毕,子进程p2会阻塞掉主进程

按照上上面的写法再来分析一下:

   主进程先执行
主进程和子进程p1并发执行
主进程和子进程p1,子进程p2并发执行
p1.join()执行,主进程和子进程p2都被阻塞
p1执行完毕,退出 p2.join()执行,主进程执行
…….

可以看出,最早的写法发生进程阻塞的情况要少,这样就能充分发挥利用cpu,提高执行效率。

好了,说完了依次创建进程的方式,再来说说进程池的方式,直接看代码:

pool = multiprocessing.Pool(processes = 3)
for i in xrange(4):
message = "hello world %d"%i
#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
pool.apply_async(func, (message, ))
pool.close()
#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
pool.join()   
print "all over"

这种方式很简单,可以很方便的创建十几个甚至几十个进程出来,具体的函数使用很简单,我就不细说了。这里只提出一点:进程池有两个函数需要注意一下,分别是apply()函数,以及apply_async()函数。apply()是指进程池中的子进程并发的执行,apply_async()指的是进程中的子进程依次的执行。

下面简单聊一下,python中多进程之间的通信。做过linux环境下c开发的同学都清楚,进程之间的通信可以通过队列,管道,Event以及Socket来实现。python中的multiprocessing包为我们提供了很好的封装,我们可以很简单的实现进程之间的通信,我这里就简单举几个例子说明一下,不做赘述了。(如果对这方面不熟悉的小伙伴,可以先看一下linux c开发,这样理解起来要简单一些)

 1:通过队列Quere实现进程之间的通信

#向队列中写入数据
def work_1(q):
       try:
           n=1
           while n<20:
               print("work_1,%d"%n)
               q.put(n)
               time.sleep(1)
               n+=1
       except BaseException:
            print("work_1 error")
       finally:
           print("work_1 end!!!")
        
#取出队列中的数据        
def work_2(q):
       try:
           n=1
           while n<20:
                print("word_2,%d"%q.get())
                time.sleep(1)
                n+=1
       except BaseException:
           print("work_2 error")
       finally:
           print("work_2 end")

if __name__ == "__main__":
      q= multiprocessing.Queue()
      p1=multiprocessing.Process(target=work_1,args=(q,))
      p2=multiprocessing.Process(target=work_2,args=(q,))
      p1.start()
      p2.start()
      p1.join()
      p2.join()
      print("all over")

注意:put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。

 2:通过事件Event实现进程之间的通信

def wait_for_event(e):
    print("等待event事件")
    e.wait()
    print("等待event事件:e.is_set()->"+str(e.is_set()))
    print("")

def wait_for_event_timeout(e,t):
    print("等待event事件---带有超时事件")
    e.wait(t)
    print("等待event事件---带有超时事件--wait_for_event_timeout:e.is_set()->"+str(e.is_set()))
    print("")

if __name__ == "__main__":
    e=multiprocessing.Event()
    p1=multiprocessing.Process(target=wait_for_event,args=(e,))
    p2=multiprocessing.Process(target=wait_for_event_timeout,args=(e,2,))
    p1.start()
    p2.start()
    time.sleep(3)
    e.set()
    print("设置event")

说明:程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
一旦该进程通过wait()方法进入等待状态,直到另一个进程调用该Event的set()方法将内置标志设置为True时,该Event会通知所有等待状态的进程恢复运行。

………………………….

多进程编程中,除了进程之间的通信,还有一点比较重要,就是进程锁。如果多个进程同时对同一个文件进行操作,很有可能会导致文件内容被”篡改”的情况出现。因此,多进程在对共享数据进行操作的时候,需要加锁,防止数据被”篡改”。python的multiprocessing模块也提供的进程锁,看代码:

def worker_1(lock,file_name):
      lock.acquire()
      try:
            f=open(file_name,"a+")
            f.write("hahahah\n")
            f.close()
      finally:
            lock.release()
            print("work_1")

def worker_2(lock,file_name):
      lock.acquire()
      try:
            f=open(file_name,"a+")
            f.write("uuuuuu\n")
            f.close()
      finally:
            lock.release()
            print("work_2")

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f="./test.txt"
    p1=multiprocessing.Process(target=worker_1,args=(lock,f,))
    p2=multiprocessing.Process(target=worker_2,args=(lock,f,))

注意:Lock对象的状态可以为locked和unlocked。使用acquire()设置为locked状态。使用release()设置为unlocked状态。如果当前的状态为unlocked,则acquire()会将状态改为locked然后立即返回。当状态为locked的时候,acquire()将被阻塞直到另一个线程中调用release()来将状态改为unlocked,然后acquire()才可以再次将状态置为locked。Lock.acquire(blocking=True, timeout=-1),blocking参数表示是否阻塞当前线程等待,timeout表示阻塞时的等待时间 。如果成功地获得lock,则acquire()函数返回True,否则返回False,timeout超时时如果还没有获得lock仍然返回False。

聊完的多进程编程,我们再来聊聊多线程编程。任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程。多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

python中用来进行多线程编程的模块有两个。一个是threading模块,一个是thread模块。threading是一个高级模块,一般来说,编写多线程的代码的时候都是通过threading模块来实现。先来看看python中多线程编程的经典代码片段吧。

def thread_student():
    print("Hello,%s(in %s)"%(student,threading.current_thread().name))

def thread_process(name):
    thread_student()

if __name__ == "__main__":
    t1 = threading.Thread(target=thread_process,args=("thread_a",))
    t2 = threading.Thread(target=thread_process,args=("thread_b",))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("all over")

看到了吧,多线程编程和多进程编程方式是非常类似的。多线程编程也会涉及到锁,通信等概念,由于在实际开发中,python多线程因为其自身的缺点,导致不会频繁的使用,所以,这里就不展开讲了(其实和python的多进程编程很像,又需要的小伙伴可以查一下)。

在最后解释一下,最先说过的python线程为什么鸡肋的问题:Python的线程虽然是真正的线程,但解释器执行代码时,有有一个GIL锁:Global Interpreter Lock,任何Python线程在执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,并不能做到真正的并发执行。