Python - 异步编程

佚名 / 2023-05-14 / 原文

目录
  • asyncio.as_complete 即时获得协程返回的结果
  • 使用asyncio 和 HTTPX 下载
  • 原生协程的秘密

asyncio.as_complete 即时获得协程返回的结果

import asyncio
import socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4 # 1

async def probe(domain: str) -> tuple[str, bool]: # 2
    loop = asyncio.get_running_loop() # 3
    try:
        await loop.getaddrinfo(domain, None) # 4
    except socket.gaierror:
        return (domain, False)
    return (domain, True)


async def main() -> None: # 5
    names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) # 6
    domains = (f'{name}.dev'.lower() for name in names) 
    coros = [probe(domain) for domain in domains] # 7

    for coro in asyncio.as_completed(coros): # 8
        domain, found = await coro # 9
        mark = '+' if found else ' '
        print(f'{mark} {domain}')

if __name__ == '__main__':
    asyncio.run(main()) # 10

# out:
'''
+ in.dev
+ try.dev
+ as.dev
+ def.dev
+ and.dev
+ from.dev
+ del.dev
+ not.dev
  is.dev
  else.dev
  if.dev
  true.dev
  none.dev
  with.dev
  for.dev
  elif.dev
  pass.dev
  or.dev
'''
1.设置关键字的最大长度,域名越短越好
2.porbe 返回一个元组,包含一个域名和一个布尔值。True 表示可解析
3.获取asyncio 事件循环的引用,供后面使用
4.该方法返回一个五元组,例如:[(<AddressFamily.AF_INET: 2>, 0, 0, '', ('5.77.63.236', 0))]
  如果域名不可解析,则会返回socket.gaierror 错误
5.main必定是一个协程,因此可以在主体中使用await
6.一个生成器,产出长度不超过MAX_KEYWORD_LEN 的Python 关键字
7.调用probe协程,传入各个domain,构建一个协程列表对象
8.asyncio.as_completed 是一个生成器,产出协程,按照传入的协程完成的顺序(不是协程的提交顺序)返回结果
9.生成器产出协程后代表协程已结束,因为as_completed 就是这个作用。因此await 表达式不阻塞,但是我们需要
  从coro 中获取结果。若coro 抛出的异常未处理,自然在这里重新抛出
10.asyncio.run 启动事件循环,仅当事件循环退出后返回。使用asyncio的脚本经常这样做,即把main 实现为协程
  在if __name__ == '__main__':块中使用asyncio.run 驱动

思考: asyncio.as_completed 写法和其他写法的不同?

https://www.jianshu.com/p/eed5da9965f2

使用asyncio 和 HTTPX 下载

import asyncio
from typing import Callable
from pathlib import Path
import time
from httpx import AsyncClient

POP20_CC = ('CH IN US ID BR PK NG BD RU JP'
            'MX PH VN ET EG DE IR TR CD FR').split()

DEST_DIR = Path('downloaded')
BASE_URL = 'http://mp.ituring.com.cn/files'


async def get_flag(client: AsyncClient, cc: str):
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=6.1, follow_redirects=True)
    return resp.read()


def save_flag(img: bytes, filename: str) -> None:
    (DEST_DIR / filename).write_bytes(img)


async def download_one(clinet: AsyncClient, cc: str):
    image = await  get_flag(clinet, cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)


async def supervisor(cc_list: list[str]) -> int:
    async with AsyncClient() as client:
        to_do = [download_one(client, cc) for cc in cc_list]

        res = await asyncio.gather(*to_do)
    return len(res)


def download_many(cc_list: list[str]) -> int:
    return asyncio.run(supervisor(cc_list))


def main(downloader: Callable[[list[str]], int]) -> None:
    DEST_DIR.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')


if __name__ == '__main__':
    main(download_many)

原生协程的秘密

asyncio 事件循环在背后调用.send 驱动你的协程,而你的协程使用await 等待其他协程,包括标准库提供的协程。await 的实现大量借鉴yield from 也调用.send 驱动协程

await 链最终到达一个底层可异步调用的对象,返回一个生成器,由事件循环驱动。对计时器或网络I/O等事件做出响应。位于await链末端的底层可异步调用对象深埋在库的实现中, 不作为API开放,有可能是Python/C扩展

使用asyncio.gahter 和 asyncio.create_task 等函数可以启动多个并发await 通道,在单个线程内由单个事件循环驱动多个I/O操作并发执行