2022年 11月 9日

(六)进程,多进程,进程池,进程间通信

通常使用multiprocessing创建进程,父进程只用作进程的创建和回收,不做其他工作。
multiprocessing 进程对象属性
p.start() 启动进程
p.join()  回收进程
p.is_alive() 判断进程生命周期状态 返回True False
p.name 进程名称 默认为Process-1 如果起名字则为自定义名称
p.pid  创建的进程的PID
p.daemon  
默认值为False 父进程退出不会影响子进程运行
如果设置为True 则父进程退出子进程也会退出

* daemon 的设置必须在start前
* 如果设置daemon为True 则不再使用join

 

  1. from multiprocessing import Process 
  2. from time import sleep,ctime 
  3. def tm():
  4.     while True:
  5.         sleep(2)
  6.         print(ctime())
  7. p = Process(target = tm)
  8. #在start前设置daemon为true
  9. p.daemon = True 
  10. p.start()
  11. sleep(5)
  12. print("main process over")

创建自己的进程类
1.继承process类
2.重写 __init__ 并且调用父类的__init__

3.重写run方法,此时生成对象后 调用start就会自动运行run

  1. from multiprocessing import Process 
  2. from time import sleep 
  3. def worker(sec,name):
  4.     for i in range(3):
  5.         sleep(sec)
  6.         print("I'm %s"%name)
  7.         print("I'm working.....")
  8. #通过args给函数传参        通过kwargs给函数传参
  9. p = Process(name = "Worker",target = worker,args = (2,),kwargs = {'name':'Levi'})
  10. p.start()
  11. print("is alive :",p.is_alive()) #判断进程状态 
  12. print("process name:",p.name) #进程名
  13. print("process PID:",p.pid) #子进程PID
  14. p.join()
  15. print("=====Process over========")
  1. from multiprocessing import Process 
  2. from time import sleep 
  3. import os 
  4. def th1():
  5.     sleep(3)
  6.     print("吃饭")
  7.     print(os.getppid(),"----",os.getpid())
  8. def th2():
  9.     sleep(2)
  10.     print("睡觉")
  11.     print(os.getppid(),"----",os.getpid())
  12. def th3():
  13.     sleep(4)
  14.     print("打豆豆")
  15.     print(os.getppid(),"----",os.getpid())
  16. things = [th1,th2,th3]
  17. process = []
  18. for th in things:
  19.     p = Process(target = th)
  20.     process.append(p) #保存进程对象
  21.     p.start()
  22. for p in process:
  23.     p.join()
  1. from multiprocessing import Process 
  2. import time 
  3. class ClockProcess(Process):
  4.     def __init__(self,value):
  5.         super().__init__() #调用父类的__init__
  6.         self.value = value    
  7.     def run(self): #重写run方法
  8.         for i in range(5):
  9.             time.sleep(self.value)
  10.             print("The time is {}".\
  11.                 format(time.ctime()))
  12. p = ClockProcess(2) #用自己的类创建进程对象
  13. #自动执行run 
  14. '''
  15. def start(self):
  16.     self.run()
  17. '''
  18. p.start()
  19. p.join()

多进程
优点:能并行执行多个任务,提高效率
     创建方便,运行独立,不受其他进程影响
     数据安全
缺点:进程的创建和删除都需要消耗计算机的资源

进程池技术
产生原因:如果有大量任务需要多进程完成,且可能需要频繁的创建和删除进程,给计算机带来大量的资源消耗。
原理:在进程池内运行一定数量进程,通过这些进程完成进程池队列中的事件,
     直到事件执行完毕,减少进程不断的创建删除过程。
实施操作方法:
1.创建进程池,在进程池中放入适当进程
2.将事件加入到进程池队列
3.事件不断运行,直到所有事件运行完毕
4.关闭进程池,回收进程

 from multiprocessing import Pool

pool = Pool(processes)
功能:创建进程池对象
参数:表示进程池中有多少进程

pool.apply_async(func,args,kwds)
功能:将事件放入进程池队列
参数:func 要执行的事件
        args 给func用元组传参
          kwds 给func用字典传参

返回值:返回事件对象  通过get()方法获取事件函数返回值

pool.closs()

功能:关闭进程池,不能再添加新的事件

pool.join()
功能:阻塞等待回收进程池

pool.map(func,iter)
功能:将要完成的事件放入进程池
参数:func要完成的事件函数
       iter可迭代对象给func传参
返回值:事件函数的返回值列表
 

  1. from multiprocessing import Pool 
  2. from time import sleep,ctime 
  3. def worker(msg): #事件函数
  4.     sleep(2)
  5.     print(msg)
  6.     return msg + "over"
  7. pool = Pool(processes = 4) #创建进程池,放4个进程
  8. result = []
  9. for i in range(10):
  10.     msg = "hello %d"%i
  11.     r = pool.apply_async(func = worker,args = (msg,))#将事件放入进程池
  12.     result.append(r)#保存返回值对象
  13.     # pool.apply(func = worker,args = (msg,))
  14. pool.close() #关闭进程池
  15. pool.join()#回收进程池
  16. #获取事件函数的返回值
  17. for i in result:
  18.     print(i.get())
  1. from multiprocessing import Pool 
  2. import time 
  3. def fun(n):
  4.     time.sleep(1)
  5.     print("执行 pool map事件")
  6.     return n * n 
  7. pool = Pool(4) #创建进程池
  8. r = pool.map(fun,range(6))#使用map将事件放入进程池
  9. print("返回值列表r",r)
  10. pool.close()
  11. pool.join()

cookie
获取文件大小
os.path.getsize(path)
功能:获取一个文件的大小
参数:文件

进程间通信
进程间由于空间独立,资源无法互相直接获取,此时在不同的进程间进行数据传递就需要专门的通信方法
进程间通信方法(IPC)

管道  消息队列   共享内存   信号   信号量   套接字

管道通信  Pipe
管道:在内存中开辟一段空间,形成管道结构,管道对多个进程可见,进程可以对管道进行读写操作
multiprocess  —>  Pipe
fd1,fd2 = Pipe(duplex = True)
功能:创建一个管道
参数:默认为双向管道
    如果设置为False 则为单向管道
返回值:如果双向管道,fd1 fd2都可以进行读写操作
       如果是单向管道,则fd1 只可读,fd2只可写
fd.recv()
功能:从管道读取内容
返回值:读到的内容

* 如果管道无内容则阻塞

fd.send()
功能:向管道写入内容
参数:要发送的内容

* 几乎可以发送所有python支持的数据
 

  1. from multiprocessing import Process,Pipe 
  2. import os,time 
  3. fd1,fd2 = Pipe(False)#创建管道
  4. def fun(name):
  5.     time.sleep(3)
  6.     fd2.send({'a':1,'b':2}) #向管道写入内容
  7. jobs = []
  8. for i in range(5):
  9.     p = Process(target = fun,args = (i,))   
  10.     jobs.append(p)
  11.     p.start()
  12. for i in range(5):
  13.     data = fd1.recv()    #读取管道
  14.     print(data)
  15. for i in jobs:
  16.     i.join()

消息队列
队列:先进先出
在内存中开辟队列结构空间,多个进程可以向队列投放消息,在取出的时候按照存入顺序取出

创建队列
q = Queue(maxsize = 0)
功能:创建队列
参数:maxsize 默认表示根据系统分配空间存储消息
    如果传入一个正整数则表示最多存放多少条消息
返回:队列对象

q.put(data,[block,timeout])
功能:存放消息
参数:data 存入的消息(python数据类型)
       block 默认为True表示当队列满的时候阻塞 ,设置为False则表示非阻塞

 

         timeout 当block为True表示超时时间

data = q.get([block,timeout])
功能:取出消息
参数:block 默认为True 当队列空时阻塞 , 设置为False表示非阻塞
        timeout 当block为True时表示超时时间
返回值:返回获取到的消息

q.full() 判断队列是否为满
q.empty() 判断队列是否为空
q.qsize() 判断当前队列有多少消息
q.close() 关闭队列

 

 

 

 

  1. from multiprocessing import Queue
  2. from time import sleep 
  3. q = Queue(3)#创建消息队列
  4. q.put(1)
  5. sleep(0.1)
  6. print(q.empty())
  7. print(q.full())
  8. q.put(2)
  9. q.put(3)
  10. print(q.qsize())
  11. print(q.full())
  12. #队列已满则阻塞
  13. # q.put(4,True,3)
  14. print(q.get())
  15. print(q.qsize())
  16. q.close()
  1. from multiprocessing import Process,Queqe
  2. import time

q = Queue() #创建消息队列def fun1(): time.sleep(1)  q.put([1,2,3,4])

def fun2():    print(“收到消息:”,q.get())p1 = Process(target = fun1)p2 = Process(target = fun2)p1.start()p2.start()p1.join()p2.join()     共享内存
在内存中开辟一段空间,存储数据,对多个进程可见每次写入共享内存中的数据会覆盖之前的内容
from multiprocessing import Value

obj = Value(ctype,obj)
功能:开辟共享内存空间
参数:ctype 字符串  要转变的c的数据类型    (对比类型对照表)
           obj 共享内存的初始化数据
返回:共享内存对象

obj.value 表示共享内存中的值,对其修改或者使用即可

obj = Array(ctype,obj)
功能:开辟共享内存
参数:ctype 要转化的c的类型
        obj 要存入共享内存的数据
         列表  将列表存入共享内存,数据类型一致
        正整数  表示开辟几个数据空间

  1. from multiprocessing import Process,Array 
  2. import time 
  3. # shm = Array('i',[1,2,3,4,5]) #创建共享内存,存入列表
  4. shm = Array('i',5#表示开辟5个空间
  5. def fun(): 
  6.     for i in shm:#shm 是可迭代对象
  7.         print(i)
  8.     shm[3] = 1000  #修改共享内存
  9. p = Process(target = fun)
  10. p.start()
  11. p.join()
  12. for i in shm:
  13.     print(i)
  1. from multiprocessing import Process,Value 
  2. import time 
  3. import random 
  4. money = Value('i',2000#创建共享内存对象 #存放整形,初始为2000
  5. #存钱
  6. def deposite():
  7.     for i in range(100):
  8.         time.sleep(0.05)   
  9.         money.value += random.randint(1,200)#对money value属性进行操作
  10. #取钱
  11. def withdraw():
  12.     for i in range(100):
  13.         time.sleep(0.04)
  14.         money.value -= random.randint(1,200)
  15. d = Process(target = deposite)
  16. w = Process(target = withdraw)
  17. d.start()
  18. w.start()
  19. d.join()
  20. w.join()
  21. print(money.value)#查看共享内存数据
  管道 消息队列 共享内存
开辟空间 内存 内存 内存
读写方式 两端读写 (双向/单向)   先进先出 操作覆盖内存
效率 一般   一般 较快
应用 多用于两端通信  使用广泛 复杂,需要同步互斥机制