Async
协程可以使用更少的资源实现“并发”的效果。
在开发过程中要注意协程只有在遇到await才会被切换,因此在web开发中如果出现会阻塞的代码(消耗时间的同步代码或者cpu密集型代码块)需要将其添加到其他进程或者线程,才能避免阻塞主其他的请求。
ps:与go相比,GIL锁的存在,在单线程中启动python时,相当于只有一套的GMP。
普通的异步调用
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
create_task:异步并发调用
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
TaskGroup:异步任务组
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(
say_after(1, 'hello'))
task2 = tg.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# The await is implicit when the context manager exits.
print(f"finished at {time.strftime('%X')}")
批量迭代创建任务:
注意:需要添加到集合,否则会导致任务执行期间被垃圾回收
background_tasks = set()
for i in range(10):
task = asyncio.create_task(some_coro(param=i))
# Add task to the set. This creates a strong reference.
background_tasks.add(task)
# To prevent keeping references to finished tasks forever,
# make each task remove its own reference from the set after
# completion:
task.add_done_callback(background_tasks.discard)
任务组的使用及终止
通过在任务组中的任务抛出异常可以终止剩余任务的执行
import asyncio
from asyncio import TaskGroup
class TerminateTaskGroup(Exception):
"""Exception raised to terminate a task group."""
async def force_terminate_task_group():
"""Used to force termination of a task group."""
raise TerminateTaskGroup()
async def job(task_id, sleep_time):
print(f'Task {task_id}: start')
await asyncio.sleep(sleep_time)
print(f'Task {task_id}: done')
async def main():
try:
async with TaskGroup() as group:
# spawn some tasks
group.create_task(job(1, 0.5))
group.create_task(job(2, 1.5))
# sleep for 1 second
await asyncio.sleep(1)
# add an exception-raising task to force the group to terminate
group.create_task(force_terminate_task_group())
except* TerminateTaskGroup:
pass
asyncio.run(main())
gather:并发的创建和运行任务
异常不会导致其他任务停止
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: compute factorial({number}), currently i={i}...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
return f
async def main():
# schedule three calls *concurrently*:
L = await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
print(L)
asyncio.run(main())
wait_for:控制协程的运行时间
async def main():
async with asyncio.timeout(10):
await long_running_task()
# 其他
time_at
time_for
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')
async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except TimeoutError:
print('timeout!')
asyncio.run(main())
在其他thread中运行同步函数
def blocking_io():
print(f"start blocking_io at {time.strftime('%X')}")
# Note that time.sleep() can be replaced with any blocking
# IO-bound operation, such as file operations.
time.sleep(1)
print(f"blocking_io complete at {time.strftime('%X')}")
async def main():
print(f"started main at {time.strftime('%X')}")
await asyncio.gather(
asyncio.to_thread(blocking_io),
asyncio.sleep(1))
print(f"finished main at {time.strftime('%X')}")
asyncio.run(main())
与
run_coroutine_threadsafe
的区别在非异步代码中需要运行协程,使用run_coroutine_threadsafe
import asyncio import threading # 定义一个协程 async def my_coroutine(): await asyncio.sleep(1) return "Done" # 在另一个线程中运行事件循环 def start_event_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() # 主线程中提交协程到事件循环 def submit_coroutine(loop): future = asyncio.run_coroutine_threadsafe(my_coroutine(), loop) print(future.result()) # 等待并获取结果 # 创建新的事件循环 loop = asyncio.new_event_loop() # 启动事件循环线程 thread = threading.Thread(target=start_event_loop, args=(loop,)) thread.start() # 在主线程中提交协程 submit_coroutine(loop) # 关闭事件循环 loop.call_soon_threadsafe(loop.stop) thread.join()
其他
# 通过将参数设置为0,可以在长时间的阻塞任务中,使得其他任务可以执行
asyncio.sleep
# 提高协程效率
# 只有在程序阻塞时才会切换,减少loop导致的性能开销
asyncio.create_eager_task_factory(custom_task_constructor)
# 防止协程被取消
asyncio.shield(aw)
# 当执行成功时,可以返回执行成功的时间
done, pending = await asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
# 遍历可等待的对象,获得
asyncio.as_completed
在以上函数外,python还提供了一系列的获取协程状态的函数。
问题:
fastapi中,如果在async中调用了一个阻塞的函数,会阻塞其他请求吗
会,可以将阻塞的函书放到其他线程或者进程(CPU密集)中
# 放到进程中 import concurrent.futures import math PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime)) if __name__ == '__main__': main()
# 放到线程中 import asyncio def blocking_function(): # 模拟阻塞操作 import time time.sleep(5) return "Done" async def main(): result = await asyncio.to_thread(blocking_function) print(result) asyncio.run(main())
License:
CC BY 4.0