Python - 异步编程
目录
- asyncio.as_complete 即时获得协程返回的结果
- 使用asyncio 和 HTTPX 下载
- 原生协程的秘密
asyncio.as_complete 即时获得协程返回的结果
import asyncio
import socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4 # 1
async def probe(domain: str) -> tuple[str, bool]: # 2
loop = asyncio.get_running_loop() # 3
try:
await loop.getaddrinfo(domain, None) # 4
except socket.gaierror:
return (domain, False)
return (domain, True)
async def main() -> None: # 5
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) # 6
domains = (f'{name}.dev'.lower() for name in names)
coros = [probe(domain) for domain in domains] # 7
for coro in asyncio.as_completed(coros): # 8
domain, found = await coro # 9
mark = '+' if found else ' '
print(f'{mark} {domain}')
if __name__ == '__main__':
asyncio.run(main()) # 10
# out:
'''
+ in.dev
+ try.dev
+ as.dev
+ def.dev
+ and.dev
+ from.dev
+ del.dev
+ not.dev
is.dev
else.dev
if.dev
true.dev
none.dev
with.dev
for.dev
elif.dev
pass.dev
or.dev
'''
1.设置关键字的最大长度,域名越短越好
2.porbe 返回一个元组,包含一个域名和一个布尔值。True 表示可解析
3.获取asyncio 事件循环的引用,供后面使用
4.该方法返回一个五元组,例如:[(<AddressFamily.AF_INET: 2>, 0, 0, '', ('5.77.63.236', 0))]
如果域名不可解析,则会返回socket.gaierror 错误
5.main必定是一个协程,因此可以在主体中使用await
6.一个生成器,产出长度不超过MAX_KEYWORD_LEN 的Python 关键字
7.调用probe协程,传入各个domain,构建一个协程列表对象
8.asyncio.as_completed 是一个生成器,产出协程,按照传入的协程完成的顺序(不是协程的提交顺序)返回结果
9.生成器产出协程后代表协程已结束,因为as_completed 就是这个作用。因此await 表达式不阻塞,但是我们需要
从coro 中获取结果。若coro 抛出的异常未处理,自然在这里重新抛出
10.asyncio.run 启动事件循环,仅当事件循环退出后返回。使用asyncio的脚本经常这样做,即把main 实现为协程
在if __name__ == '__main__':块中使用asyncio.run 驱动
思考: asyncio.as_completed 写法和其他写法的不同?
https://www.jianshu.com/p/eed5da9965f2
使用asyncio 和 HTTPX 下载
import asyncio
from typing import Callable
from pathlib import Path
import time
from httpx import AsyncClient
POP20_CC = ('CH IN US ID BR PK NG BD RU JP'
'MX PH VN ET EG DE IR TR CD FR').split()
DEST_DIR = Path('downloaded')
BASE_URL = 'http://mp.ituring.com.cn/files'
async def get_flag(client: AsyncClient, cc: str):
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
resp = await client.get(url, timeout=6.1, follow_redirects=True)
return resp.read()
def save_flag(img: bytes, filename: str) -> None:
(DEST_DIR / filename).write_bytes(img)
async def download_one(clinet: AsyncClient, cc: str):
image = await get_flag(clinet, cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True)
async def supervisor(cc_list: list[str]) -> int:
async with AsyncClient() as client:
to_do = [download_one(client, cc) for cc in cc_list]
res = await asyncio.gather(*to_do)
return len(res)
def download_many(cc_list: list[str]) -> int:
return asyncio.run(supervisor(cc_list))
def main(downloader: Callable[[list[str]], int]) -> None:
DEST_DIR.mkdir(exist_ok=True)
t0 = time.perf_counter()
count = downloader(POP20_CC)
elapsed = time.perf_counter() - t0
print(f'\n{count} downloads in {elapsed:.2f}s')
if __name__ == '__main__':
main(download_many)
原生协程的秘密
asyncio 事件循环在背后调用.send 驱动你的协程,而你的协程使用await 等待其他协程,包括标准库提供的协程。await 的实现大量借鉴yield from 也调用.send 驱动协程
await 链最终到达一个底层可异步调用的对象,返回一个生成器,由事件循环驱动。对计时器或网络I/O等事件做出响应。位于await链末端的底层可异步调用对象深埋在库的实现中, 不作为API开放,有可能是Python/C扩展
使用asyncio.gahter 和 asyncio.create_task 等函数可以启动多个并发await 通道,在单个线程内由单个事件循环驱动多个I/O操作并发执行