tonado 使用2

tslam / 2024-01-22 / 原文

1 介绍

~ 见官方文档 或tonado 使用

2 协程

2.0 异步HTTP请求示例代码

from tornado.httpclient import AsyncHTTPClient


async def asynchronous_fetch(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

~ python3.5 之前实现异步HTTP请求是另外的实现, 详见tornado官方文档

 

2.1如何调用协程

1) 协程中如何引发异常

"""
协程不会以正常方式引发异常:它们引发的任何异常都会被捕获在可等待对象中,直到被yield。
"""
async def divide(x, y):
    return x / y


def bad_call():
    # 这应该引发ZeroDivisionError,但是由于
    # 协程的调用方式不正确,所以不会引发异常。
    divide(1, 0)

 

2) 几乎所有调用协程的函数本身都必须是协程, 调用时使用await或yield关键字

async def good_call():
    # await将解包divide()返回的对象并引发异常。
    await divide(1, 0)

 

3) 若希望`启动并忘记`一个协程, 而不是等待其结果

# IOLoop会捕获异常并在日志中打印堆栈跟踪。注意这看起来不像是普通的调用,
# 因为我们传递要由IOLoop调用的函数对象。
IOLoop.current().spawn_callback(divide, 1, 0)

 

4) 启动IOLoop、运行协程,然后停止IOLoop, 用于启动批处理程序的主函数

# run_sync()不接受参数,所以我们必须用lambda将调用包装起来。
IOLoop.current().run_sync(lambda: divide(1, 0))

 

2.2 协程模式

# 概念: 是指在处理协程时常用的一些模式或技巧
# 目的: 帮助开发者更好地控制协程的执行流程,并实现复杂的异步操作

1) 如何调用阻塞函数

# 使用IOLoop.run_in_executor,它返回与协程兼容的Future对象。
async def call_blocking():
    await IOLoop.current().run_in_executor(None, blocking_func, args)

 

2) 并行处理

# multi函数接受值为Future对象的列表或字典,并且可以并行等待所有这些Future的完成。
from tornado.gen import multi


async def parallel_fetch(url1, url2):
    resp1, resp2 = await multi([http_client.fetch(url1),
                                http_client.fetch(url2)])


async def parallel_fetch_many(urls):
    responses = await multi([http_client.fetch(url) for url in urls])
    # responses is a list of HTTPResponses in the same order


async def parallel_fetch_dict(urls):
    responses = await multi({url: http_client.fetch(url)
                             for url in urls})
    # responses is a dict {url: HTTPResponse}

3) 交错处理

# 有时,保存一个Future而不立即yield它是有用的,这样可以在等待之前开始另一个操作。
# 通过保存Future对象并在需要时进行yield操作,可以在协程中实现操作的交错执行
from tornado.gen import convert_yielded


async def get(self):
    # convert_yielded() starts the native coroutine in the background.
    # This is equivalent to asyncio.ensure_future() (both work in Tornado).
    fetch_future = convert_yielded(self.fetch_next_chunk())
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = convert_yielded(self.fetch_next_chunk())
        yield self.flush()

 

2.3 循环迭代

# 在原生协程中,可以使用async for语法进行循环迭代
# ~ 在旧版本python中 使用携程进行循环迭代比较棘手-见官方文档


2.4 后台运行

"""
这段代码定义了一个名为minute_loop的异步函数(async function)。它的功能是在一个无限循环中执行两个异步操作。

在循环的每次迭代中,首先会调用await do_something(),这表示会等待do_something()函数执行完毕,并且在其完成后继续执行下一步。

接着,代码会调用await gen.sleep(60),这表示会等待60秒的时间。gen.sleep(60)是一个异步操作,它会暂停当前协程的执行,让其他任务有机会运行,然后在指定的时间间隔之后恢复执行。

最后一行代码IOLoop.current().spawn_callback(minute_loop)的作用是将minute_loop协程以后台任务(background task)的形式加入到当前的IOLoop循环中。这样可以保证minute_loop协程会被周期性地执行,而不会阻塞主线程或其他任务的执行。

综合来说,这段代码创建了一个每隔60秒执行一次do_something()操作的循环,并将该循环作为后台任务添加到当前的IOLoop循环中,以实现定期执行的效果
"""
async def minute_loop():
    while True:
        await do_something()
        await gen.sleep(60)


IOLoop.current().spawn_callback(minute_loop)

 

"""
上面的循环每隔60+N秒运行一次,其中N是do_something()的运行时间。要确保每隔准确的60秒运行一次,可以使用上面提到的交错模式
"""
# >>> 需要实际验证
async def minute_loop2():
    while True:
        nxt = gen.sleep(60)  # Start the clock.
        await do_something()  # Run while the clock is ticking.
        await nxt  # Wait for the timer to run out.