刘刚刚的个人博客

常见的IO模型


合理的处理IO可以提高系统的处理速度,提高CPU的利用率

通讯中主要包括网络IO和从内核拷贝数据到线程中,内核拷贝到线程中速度很快,因此IO一般在网络中。

网络IO 模型主要包括:
blocking IO 阻塞IO
nonblocking IO 非阻塞IO
IO multiplexing IO多路复用
signal driven IO 信号驱动IO
asynchronous IO 异步IO
信号驱动IO并不常用,所以主要介绍其余四种IO Model。

阻塞IO模型

当数据没有发送过来时,程序就在原地等待。

image-20200911231057153

import socket


server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)


while True:
    # 阻塞1
    conn, addr = server.accept()
    while True:
        try:
            # 阻塞2
            data = conn.recv(1024)
            if len(data) == 0:break
            print(data)
            conn.send(data.upper())
        except ConnectionResetError as e:
            break
    conn.close()

即使使用了多线程或多进程,IO仍然存在

非阻塞IO

在非阻塞IO中,进程向操作系统查询接受数据时,如果没有数据,那么会返回一个ERRO。

image-20200911231501175

import socket
import time


server = socket.socket()
server.bind(('127.0.0.1', 8081))
server.listen(5)
# 将所有的网络阻塞变为非阻塞
server.setblocking(False)

r_list = []
del_list = []
while True:
    try:
        conn, addr = server.accept()
        r_list.append(conn)
    except BlockingIOError:

        for conn in r_list:
            try:
                data = conn.recv(1024)  # 没有消息 报错
                if len(data) == 0:  # 客户端断开链接
                    conn.close()  # 关闭conn
                    # 将无用的conn从r_list删除
                    del_list.append(conn)
                    continue
                conn.send(data.upper())
            # 判断数据是否为erro,是的话查询下一个链接
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                del_list.append(conn)
        # 移除无用的链接
        for conn in del_list:
            r_list.remove(conn)
        del_list.clear()

# 客户端
import socket


client = socket.socket()
client.connect(('127.0.0.1',8081))


while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data)

非阻塞IO会导致即使没有任何链接或者数据的接受,cpu的负载也很高,

IO多路复用

IO多路复用,将数据的轮寻交给了系统处理。当某个socket有数据到达,则会通知程序进程下一步的操作

image-20200911234310199

from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
server.setblocking(False)
print('starting...')

rlist=[server,]
wlist=[]
wdata={}

while True:
    # 将server加入监听队列,如果没有用户连接则,程序在此处处于暂停状态
    rl,wl,xl=select.select(rlist,wlist,[],0.5)
    print(wl)
    for sock in rl:
        if sock == server:
            conn,addr=sock.accept()
            rlist.append(conn)
        else:
            try:
                data=sock.recv(1024)
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                wlist.append(sock)
                wdata[sock]=data.upper()
            except Exception:
                sock.close()
                rlist.remove(sock)

    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

poll机制 只在linux有 poll和select都可以监管多个对象 但是poll监管的数量更多

上述select和poll机制其实都不是很完美 当监管的对象特别多的时候
可能会出现 极其大的延时响应

epoll机制只在linux有

它给每一个监管对象都绑定一个回调机制
一旦有响应 回调机制立刻发起提醒

针对不同的操作系统还需要考虑不同检测机制 书写代码太多繁琐
有一个人能够根据你跑的平台的不同自动帮你选择对应的监管机制
selectors模块可以根据平台自动选择机制类型

如果监管对象只有一个,那么IO多路复用的效率比阻塞IO效率也低

异步IO

异步IO的效率最高,也是最好的方案,可以通过一个线程实现并发的效果。

将需要等待的IO交给操作系统处理

# 3.5之前写法
import threading
import asyncio


@asyncio.coroutine
def hello():
    print('hello world %s'%threading.current_thread())
    yield from asyncio.sleep(1)  # 模拟IO
    print('hello world %s' % threading.current_thread())

# 生成对象
loop = asyncio.get_event_loop()
# 添加任务
tasks = [hello(),hello()]
# 运行到任务结束
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

# 3.5及之后写法
import threading
import asyncio


async def hello():
    print('hello world %s'%threading.current_thread())
    await asyncio.sleep(1)  # 模拟IO
    print('hello world %s' % threading.current_thread())

recve是哪一步的操作

使用IO多路复用还是多线程呢

我的名片

昵称:shuta

职业:后台开发(php)

邮箱:648949076@qq.com

站点信息

建站时间: 2020/2/19
网站程序: ANTD PRO VUE + TP6.0
晋ICP备18007778号