进程与python模块的使用——multiprocess、queue
程序是硬盘中的代码,进程则是程序正在运行状态的抽象
通过进程和线程的切换,可以最大程度的提高cpu的利用率。让程序更快的获取执行结果。
程序是硬盘中的代码,进程则是程序正在运行状态的抽象。
进程的调度算法
先来先服务(FCSC):对长作业有利,对短作业无利
短作业优先调度算法(SPN):对短作业有利,对长作业无利。
时间片轮转法+多级反馈
时间片轮转:将CPU的执行时间进行划分,比如每个进程执行x毫秒或者执行中遇到IO,就切换到下一个进程。
(1) 应设置多个就绪队列,并为各个队列赋予不同的优先级。第一个队列的优先级最高,第二个队列次之,其余各队列的优先权逐个降低。该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先权愈高的队列中,为每个进程所规定的执行时间片就愈小。
(2) 当一个新进程进入内存后,首先将它放入第一队列的末尾,按FCFS原则排队等待调度。当轮到该进程执行时,如它能在该时间片内完成,便可准备撤离系统;如果它在一个时间片结束时尚未完成,调度程序便将该进程转入第二队列的末尾,再同样地按FCFS原则等待调度执行;如果它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,……,如此下去。(3) 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第1~(i-1)队列均空时,才会调度第i队列中的进程运行。若cpu在执行优先级低队列时,有程序进入高优先级队列,则会立刻将此进程放入队列末尾,把CPU分配给新到高优先级进程。
进程三状态切换
当进程遇到IO后,会进入阻塞态,同时交出CPU执行权限,执行完IO操作就进入就绪态,等待CPU的执行·
同步与异步
同步:当一个程序运行后,一直等待结果的返回
异步:当一个程序运行后,在返回结果前处理其他的程序
程序运行的理想状态:代码一直在就绪态和运行态之间切换
僵尸进程与孤儿进程
僵尸进程:当一个进程开了子进程后,子进程运行结束,并不会直接释放进程号,因为有可能父进行还需要子进程的一些内容。所有的进程最后都会转为僵尸进程。
孤儿进程:子进程存活,父进程意外死亡。此时,子进程将由操作系统进行管理
进程的创建
进程的创建细节由操作系统完成,只需要调用接口即可完成。
进程中的数据是隔离的,如果需要数据传输可以借助其他应用或模块
winows和linux中进程的区别:
windows中创建进程时发生什么?
系统会将原来的代码复制
运行一遍代码以产生命名空间
运行创建进程时指定的函数
linux中创建进程会发生什么?
调用系统的fork语句
fork将当前代码的运行空间复制一份,包括目前运行到的状态
原进程中的fork返回创建出来的进程号,新创建出来的进程的fork返回0
在不同的系统要注意:
windows中需要进行
__main__
的判断,防止循环创建windows中会运行完代码后才调用进程要调用的函数
python中创建进程
# 最常使用的方法
from multiprocessing import Process
import time
def task(name):
print('我被创建了')
if __name__ == '__main__':
# 1 创建一个对象
p = Process(target=task, args=('shuta',))
# 2 开启进程
p.start() # 告诉操作系统帮你创建一个进程 异步
print('hello')
# 第二种方式 类的继承
from multiprocessing import Process
import time
class MyProcess(Process):
def run(self):
print('我被创建了')
if __name__ == '__main__':
p = MyProcess()
p.start()
print('hello')
# 使用is_alive可以判断进程是否存活
p.is_alive()
# 查看进程号
import os
# 当前进程号
os.getpid()
# 父进程号
os.getppid()
使用join等待进程的执行
使用join后,进程会等待子进程执行完毕再执行主进程后边的代码
from multiprocessing import Process
import time
def task(name, n):
print('进程被创建了')
time.sleep(1)
if __name__ == '__main__':
p = Process(target=task, args=('jason', 1))
start_time = time.time()
p.start()
p.join() # p进程执行结束后,才会执行后边的语句
print('hello', time.time() - start_time)
使用守护进程
正常情况下,进程会等待子进程执行完毕后才会结束,如果子进程为守护进程,那么在父进程结束后,子进程也会马上结束。
注意:父进程结束指:1. 自己的代码运行完毕 2.其他非守护进程的子进程运行结束
from multiprocessing import Process
import time
def task():
print('进程被创建')
time.sleep(3)
print('进程结束')
if __name__ == '__main__':
p = Process(target=task,)
# 在进程启动前,设置为守护进程
p.daemon = True
p.start()
print('主进程结束')
进程中使用互斥锁
多个进程操作同一份数据的时候,会出现数据错乱的问题,可以使用lock模块来对需要保证安全的数据进行加锁
针对上述问题,解决方式就是加锁处理:将并发变成串行,牺牲效率但是保证了数据的安全
# 多线程模拟买票的案例,票的信息存储在文件中
from multiprocessing import Process, Lock
import json
import time
import random
# 查票
def search(i):
# 文件操作读取票数
with open('data','r',encoding='utf8') as f:
dic = json.load(f)
print('用户%s查询余票:%s'%(i, dic.get('ticket_num')))
# 字典取值不要用[]的形式 推荐使用get 这样不会报错!!!
# 买票 1.先查 2.再买
def buy(i):
# 先查票
with open('data','r',encoding='utf8') as f:
dic = json.load(f)
# 模拟网络延迟
time.sleep(random.randint(1,3))
# 判断当前是否有票
if dic.get('ticket_num') > 0:
# 修改数据库 买票
dic['ticket_num'] -= 1
# 写入数据库
with open('data','w',encoding='utf8') as f:
json.dump(dic,f)
print('用户%s买票成功'%i)
else:
print('用户%s买票失败'%i)
# 整合上面两个函数
def run(i, mutex):
search(i)
# 给买票环节加锁处理
# 抢锁
mutex.acquire()
buy(i)
# 释放锁
mutex.release()
if __name__ == '__main__':
# 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票
mutex = Lock()
for i in range(1,11):
p = Process(target=run, args=(i, mutex))
p.start()
进程间通讯
queue的使用
form multiprocess import Queue
# 创建队列,传入队列的大小
q = Queue(5)
# 往队列中存数据
q.put(111)
q.put(222)
# 在队列中取数据
v1 = q.get()
v2 = q.get()
v = q.get_nowait() # 没有数据直接报错queue.Empty
v = q.get(timeout=3) # 没有数据之后原地等待三秒之后再报错
# 后进先出队列
q = queue.LifoQueue(3)
# 优先级排序队列
q = queue.PriorityQueue(4)
q.put((10, '111'))
print(q.get())
进程间通讯(IPC)
# 借助与队列的通讯(生产者与消费者模型)
from multiprocessing import Queue, Process
def producer(q):
q.put('生产的数据')
def consumer(q):
print(q.get())
if __name__ == '__main__':
q = Queue()
p = Process(target=producer,args=(q,))
p1 = Process(target=consumer,args=(q,))
p.start()
p1.start()
进程池
为什么要用池?
频繁的创建销毁进程会降低计算机的执行效率
提前将进程创建好可以提高请求到来时的处理速度
防止同时的请求过多,消耗系统过多的资源,降低的系统的请求处理速度
示例:
from concurrent.futures import ThreadPoolExecutor
import time
import os
# 1. 设置好进程池的大小,并生成进程池
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程
pool = ThreadPoolExecutor(5) # 池子里面固定只有五个线程
def task(n):
print(n,os.getpid())
time.sleep(2)
return n**n
def call_back(n):
print('call_back>>>:',n.result())
'''
任务的提交方式
同步:提交任务之后原地等待任务的返回结果 期间不做任何事
异步:提交任务之后不等待任务的返回结果 执行继续往下执行
返回结果如何获取???
异步提交任务的返回结果 应该通过回调机制来获取
一旦该任务有结果立刻触发回调
'''
# 使用同步的方式提交
if __name__ == '__main__':
# 朝池中提交任务,传入需要的参数
pool.submit(task, 1)
print('主')
t_list = []
# 朝池子中提交20个任务
for i in range(20):
# 任务完成后,调用call_back方法
res = pool.submit(task, i).add_done_callback(call_back)
使用异步的方式提交:
if __name__ == '__main__':
pool.submit(task, 1) # 朝池子中提交任务 异步提交
print('主')
t_list = []
for i in range(20): # 朝池子中提交20个任务
res = pool.submit(task, i)
print(res.result()) # result方法 同步提交
t_list.append(res)
# 等待池中所有的任务执行完毕之后,关闭线程池,再继续执行后续的代码
pool.shutdown()
for t in t_list:
print('>>>:',t.result())
互斥锁是如何实现的?
queue是如何实现的