python异步爬虫

Kang_kin / 2023-05-06 / 原文

异步爬虫

基础知识

阻塞

​ 阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则该程序在操作上是阻塞的。

​ 常见的阻塞形式有:网络I/O阻塞、磁盘I/O阻塞、用户输入阻塞等。阻塞是无处不在的,包括在CPU切换上下文时,所有进程都无法真正干事情,它们也会被阻塞。在多核CPU的情况下,正在执行上下文切换操作的核不可被利用

非阻塞

​ 程序在等待某操作的过程中,自身不被阻塞,可以继续干别的事情,则称该程序在该操作系统上是非阻塞的。

​ 非阻塞并不是在任何程序级别、任何情况下都存在的。仅当程序封装的级别可以囊括独立的子程序单元时,程序才能存在非阻塞状态。

​ 非阻塞因阻塞的存在而存在,正因为阻塞导致程序运行的耗时增加与效率低下,我们才需要将它变为非阻塞的。

同步

不同程序单元为了共同完成某个任务,在执行过程中需要靠某种通信方式保持协调一致,此时这些程序单元是同步执行的。

例如在购物系统中更新商品库存时,需要用“行锁”作为通信信号,强制让不同的更新请求排队并按顺序执行,这里的更新库存操作就是同步的。

异步

​ 为了完成某个任务,不同程序单元之间无需通信协调也能完成任务,此时不限同的程序单元之间可以是异步的。

​ 例如,爬取下载网页。调度程序调用下载程序后,既可调度其他任务,无须与该下载任务保持通信以协调行为。不同网页的下载,保存等操作都是无关的,也无需相互通知协调。

多进程

多进程利用CPU的多核优势,在同一时间并行执行多个任务,可以大大提高执行效率

协程

​ 协程是一种运行在用户态的轻量级线程。

​ 协程拥有自己的寄存器上下文和栈。协程在调度切换时,将寄存器上下文和栈保存到其他地方,等切回来的时候,在恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入,就相当于进入上一次调用的状态。

​ 协程本质上是一个单进程,相对多线程来说,它没有线程上下文切换的开销,没有原子操作锁定及同步到开销。

定义协程

  1. event_loop:事件循环,相当于一个无限循环,可以把一些函数注册到这个事件循环上,当满足发生条件的时候,就调用对应的处理方法。
  2. coroutine:协程。在Python中指协程对象,将协程对象注册到事件循环中,它会被事件循环调用。可以使用async关键字定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
  3. task:任务,这是对协程对象的进一步封装,包含协程对象的各个状态
  4. future:代表将来执行或者没有执行的任务的结果,实际上和task没有本质区别
import asyncio

async def execute(x):
    print('Number', x)

# 返回一个协程对象    
coroutine = execute(1)
print('Coroutine', coroutine)
print('After calling execute')

# 创建一个时间循环loop
loop = asyncio.get_event_loop()
# 将协程对象注册到事件循环中启动
loop.run_until_complete(coroutine)
print('After calling loop')

# Coroutine <coroutine object execute at 0x1045e5be0>
# After calling execute
# Number 1
# After calling loop

# async定义的方法会变成一个无法直接执行的协程对象,必须要注册到事件循环中才能运行

使用task封装对象

import asyncio

async def execute(x):
    print('Number', x)
    return x

# 返回一个协程对象    
coroutine = execute(1)
print('Coroutine', coroutine)
print('After calling execute')

# 创建一个时间循环loop
loop = asyncio.get_event_loop()
# 将协程对象转换为task对象
task = loop.create_task(coroutine)
print('Task:',task)

# 将协程对象注册到事件循环中启动
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

# Coroutine <coroutine object execute at 0x102891be0>
# After calling execute
# Task: <Task pending name='Task-1' coro=<execute() running at >
# Number 1
# Task: <Task finished name='Task-1' coro=<execute() done, defined at  result=1>
# After calling loop

# 将task对象打印,从pending状态变为了finished,并且result变成了1

调用asyncio包的ensure_future方法,返回结果也为task对象,这样可以不用借助loop对象。即使没有声明loop,也可以提前定义task

import asyncio

async def execute(x):
    print('Number', x)
    return x

# 返回一个协程对象    
coroutine = execute(1)
print('Coroutine', coroutine)
print('After calling execute')

# 使用方法创建一个task对象
task = asyncio.ensure_future(coroutine)
print('Task:',task)
# 创建一个时间循环loop
loop = asyncio.get_event_loop()

# 将协程对象注册到事件循环中启动
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

多任务协程

import asyncio

tasks = [asyncio.ensure_future(req_baidu()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task Result:', task.result())
    
# Tasks: [<Task pending name='Task-1' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-2' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-3' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-4' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-5' coro=<req_baidu() running at asysncioDemo.py:15>>]
# Task Result: <Response [200]>
# Task Result: <Response [200]>
# Task Result: <Response [200]>
# Task Result: <Response [200]>
# Task Result: <Response [200]>

协程实现

当遇到一个网站需要等待页面响应返回结果时,合理的利用协程

import asyncio
import requests
import time

start = time.time()

async def req():
  	url = 'https://www.httpbin.org/delay/5'
    print('Waiting for',url)
    response = request.get(url)
    print('Get response from',url,'response',response)

tasks = [asyncio.ensure_future(req_baidu()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:',end - start )

请求耗时66秒,并没有实现异步处理,使用await关键字将耗时等待挂起,让出控制权,如果协程在执行的时候遇到await,事件循环就会将本次协程挂起,转而执行别的协程,直到其他协程挂起或执行完毕。

async def req():
  	url = 'https://www.httpbin.org/delay/5'
    print('Waiting for',url)
    response = await requests.get(url)
    print('Get response from',url,'response',response)

# Task exception was never retrieved 
# future: <Task finished coro=<request() done,defined at ...
# TypeError: object Response can't be used in 'await' expression 

错误吓死requests返回的Response对象不能喝await一起使用:

  • 一个原生协程对象
  • 一个由types.coroutine修饰的生成器,这个生成器可以返回协程对象
  • 由一个包含**_await _ **方法的对象返回的迭代器

可以将async请求改成一个协程对象

async def req_baidu():
    return requests.get('https://www.baidu.com/')

async def await_demo():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for', url)
    response = await req_baidu()
    print('Get response from', url, 'response', response)

start = time.time()
tasks = [asyncio.ensure_future(await_demo()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time', end - start)

aiohttp

import asyncio
import time

import aiohttp

start = time.time()


async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    await response.text()
    await session.close()
    return response


async def request():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'response:', response)


if __name__ == '__main__':
    tasks = [asyncio.ensure_future(request()) for _ in range(10)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    end = time.time()
    print('Cost time', end - start)

开始运行时,事件循环会运行第一个task,对于第一个task来说,当执行到第一个await跟着的get方法时,它会被挂起,但这个get方法第一步的执行是非阻塞的,挂起之后会立马被唤醒,立即又进入执行。接着遇到第二个await,调用Session.get请求,然后被挂起。然后事件循环会寻找当前未被挂起的协程继续执行,依次类推。

基本爬取

import asyncio

import aiohttp

async def fetch(session, url):
    async with session.get(url,ssl=False) as response:
        return await response.text(), response.status


async def main():
    async with aiohttp.ClientSession() as session:
        html, status = await fetch(session, 'https://www.baidu.com')
        print(f'html:{html[:100]}')
        print(f'status:{status}')
        
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

这里完成了一次基本的HTTP请求,和requests请求不同的地方:

  1. 除了必须引入aiohtto库,还需要引入asyncio。因为要实现异步爬取需要启动协程,而协程则需要借助asyncio里的事件循环才能执行。除了事件循环asyncio也提供了很多基础的异步操作
  2. 异步爬取方法的没个前面都要统一的加async来修饰
  3. with as语句同样需要async来修饰,在Python中,with as语句用于声明一个上下文管理器,帮助我们自动分配和释放资源。在异步方法中,with as前面加上async代表声明一个支持异步的上下文管理器
  4. 对于返回协程对象的操作,前面需要加await来修饰。例如Response调用的text()方法,返回的是一个协程对象,而状态码则是一串数值。

URL参数设置

import aiohttp
import asyncio

async def main():
  	# 利用params参数传入一个字典
  	params={'name':'kang','age',18}
    async with aiohttp.ClientSession() as session:
      	async with session.get('https://www.htpbin.org/get',params=params) as response:
          	print(await response.text())
            
if __name__=='__main__':
  	asyncio.get_event_loop().run_until_complete(main())
{
  "args": {
    "age": "18", 
    "name": "kang"
  }, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Host": "www.httpbin.org", 
    "User-Agent": "Python/3.11 aiohttp/3.8.4", 
    "X-Amzn-Trace-Id": "Root=1-6454e482-0599efda04b2a55f6a7a0e14"
  }, 
  "origin": "139.5.108.229", 
  "url": "https://www.httpbin.org/get?name=kang&age=18"
}

其他类型请求

# 与requests库中的请求方法类似
session.post('http://www.httpbin.org/post',data=b'data')
session.put('http://www.httpbin.org/put',data=b'data')
session.delete('http://www.httpbin.org/delete',data=b'data')
session.head('http://www.httpbin.org/get')
session.options('http://www.httpbin.org/options')
session.patch('http://www.httpbin.org/patch',data=b'data')

POST请求

对于post表单提交,对于请求头中的Content-Type=application/x-www-form-urlencoded

在session请求中改为 data={'name':'kang','age':18} session.post('https://www.httpbin.org/post',data=data)

json数据提交对应请求头中的Content-type=application/json

session.post('https://www.httpbin.org/post',json=data)

响应

import aiohttp
import asyncio

async def main():
  	# 利用params参数传入一个字典
  	params={'name':'kang','age',18}
    async with aiohttp.ClientSession() as session:
      	async with session.get('https://www.htpbin.org/get',params=params) as response:
          	print('status:', response.status)
            print('body:', await response.text())
            print('headers:', response.headers)
            print('bytes:', await response.read())
            print('json:', await response.json())
            
if __name__=='__main__':
  	asyncio.get_event_loop().run_until_complete(main())
status: 200
body: {
  "args": {
    "age": "18", 
    "name": "kang"
  }, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Host": "www.httpbin.org", 
    "User-Agent": "Python/3.11 aiohttp/3.8.4", 
    "X-Amzn-Trace-Id": "Root=1-6454e730-7a6e883f275648b50e2d39c6"
  }, 
  "origin": "139.5.108.229", 
  "url": "https://www.httpbin.org/get?name=kang&age=18"
}

headers: <CIMultiDictProxy('Date': 'Fri, 05 May 2023 11:23:31 GMT', 'Content-Type': 'application/json', 'Content-Length': '375', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
bytes: b'{\n  "args": {\n    "age": "18", \n    "name": "kang"\n  }, \n  "headers": {\n    "Accept": "*/*", \n    "Accept-Encoding": "gzip, deflate", \n    "Host": "www.httpbin.org", \n    "User-Agent": "Python/3.11 aiohttp/3.8.4", \n    "X-Amzn-Trace-Id": "Root=1-6454e730-7a6e883f275648b50e2d39c6"\n  }, \n  "origin": "139.5.108.229", \n  "url": "https://www.httpbin.org/get?name=kang&age=18"\n}\n'
json: {'args': {'age': '18', 'name': 'kang'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.11 aiohttp/3.8.4', 'X-Amzn-Trace-Id': 'Root=1-6454e730-7a6e883f275648b50e2d39c6'}, 'origin': '139.5.108.229', 'url': 'https://www.httpbin.org/get?name=kang&age=18'}

在返回一个协程对象时,有些字段需要使用await修饰。具体如下:

  1. 发送请求的方法名前需要加上await关键字,例如:await session.get(url)
  2. 响应的数据需要使用await修饰的部分,包括:
  • response.text(): 用于获取响应体中的文本内容。
  • response.json(): 用于获取响应体中的JSON格式数据。
  • response.read(): 用于获取响应体的原始字节流。
  1. 释放底层连接资源和关闭会话对象时,需要使用await修饰的方法,包括:
  • response.release(): 用于释放底层连接资源。
  • response.raise_for_status(): 用于检查响应状态码是否为200,并在不是200的情况下抛出异常。
  • session.close(): 用于关闭会话对象并释放所有底层资源。

aiohttp 官方文档

超时设置

利用ClienTimeout对象设置超时 timeout=aiohttp.ClientTimeout(total=1),aiohttp.ClientSession(timeout=timeout)

设置一秒的超时时间,如果超时会抛出TimeoutError,其类型为asyncio.TimeoutError

并发限制

import asyncio

import aiohttp

# 最大并发量
CONCURRENCY = 5
URL = 'https://www.baidu.com/'

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None

async def scrape_api():
    async with semaphore:
         print('scraping', URL)
         async with session.get(URL) as response:
             await asyncio.sleep(1)
             return await response.text()

async def main():              
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [asyncio.ensure_future(con) for _ in range(100)]
    await asyncio.gather(*scrape_index_tasks)   
    
if __name__ == '__main__':
		asyncio.get_event_loop().run_until_complete(main())    
    
# 声明了100个task,传入gather方法后如果不限制将会同时执行,设置最大并发量后,会被控制在5个

asyncio.gather() 可以接受零个或多个协程对象作为输入,并返回一个协程对象,该协程对象将在所有输入协程完成后运行完成。如果有任何输入协程引发异常,则 asyncio.gather() 返回的协程也会引发相同的异常。