flight_schedule = {
'boston': [3, 2],
'detroit': [7, 4],
'new york': [1, 9],
}
asyncdefmain():
asyncwith asyncio.TaskGroup() as tg:
for city, (departure_time, duration) in flight_schedule.items():
tg.create_task(simulate_flight(city, departure_time, duration))
print("Simulations done.")
asyncio.run(main())
这就是async with asyncio.TaskGroup() as tg,只要将所有需要异步完成的任务都放在这个代码里面,那么它就会让这个里面所有任务完成之后再继续跑后面的任务了,相比之前更加优雅简单。
这里还有一个比较有意思的特性,就是这些异步任务执行的时候可能会出现某些任务失败,某些任务成功的情况。在此之前,Python的处理很简单粗暴,就是遇到任意一个任务失败,整个程序崩溃。但是在这个async with asyncio.TaskGroup() as tg下的所有任务运行遇到失败的情况,它会把错误收集到一个错误组中,告诉你哪些是失败的。同时,如果你加上如下两种错误捕获的语句:
try:
asyncio.run(main())
except ExceptionGroup as eg:
print(f"Caught exceptions: {eg}")
或者
try:
asyncio.run(main())
except* ValueError as eg:
print(f"Caught ValueErrors: {eg}")
from functools import singledispatch
@singledispatchdefhalf(x):
"""Returns the half of a number"""return x / 2@half.registerdef_(x: int):
"""For integers, return an integer"""return x // 2@half.registerdef_(x: list):
"""For a list of items, get the first half of it."""
list_length = len(x)
return x[: list_length // 2]
# Outputs:print(half(3.6)) # 1.8print(half(15)) # 7print(half([1, 2, 3, 4])) # [1, 2]