python进程池的阻塞和非阻塞

xjsrm / 2023-08-20 / 原文

进程池

概念:

当需要上百上千个目标时,手动创建进程工作量大,此时就可以用到multiprocessing模块中提供的pool方法。

初始化pool时,可以指定一个最大的进程数,当有新的请求提交到pool中时,如果pool还没有满,那么就会创造一个新的进程用于执行该请求。如果pool中的进程数满了,那么就需要等待,直到pool中有进程结束,才可以接着创造进程。

池子有两种类型:一种是阻塞式,一种是非阻塞式。
阻塞式:添加一个执行一个,执行一个才返回紧接着添加下一个。(没有特别大的意义)
非阻塞式:全部添加到队列中,立刻返回结果,并没有等待pool执行完毕,当执行完毕之后才可以调用回调函数。

进程池创建的进程和主进程是共同存在的关系,只有主进程存在,pool中的子进程才存在

方法:

apply_async():非阻塞,apply_async(func[, args=()[, kwds={}[, callback=None]]])

apply() :阻塞,apply(func, args=(), kwds={})

close() : 关闭进程池(pool),使其不在接受新的任务。

join() :主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。

terminal() : 结束工作进程,不在处理未处理的任务。

非阻塞示例:

import time
from multiprocessing import Pool
#非阻塞式
def task(task_name):
    print("开始做任务",task_name,os.getpid())
    time.sleep(3)
    print("任务结束")

if __name__ =="__main__":
    pool = Pool(3)			#设置pool容量为3
    list =["1","2","3",'4','5','6']
    for i in list:
        pool.apply_async(task,args=(i,))
    pool.close()
    pool.join()
    print("over")

结果:

开始做任务 1 21492
开始做任务 2 1508
开始做任务 3 11188
任务结束任务结束任务结束


开始做任务 4 11188
开始做任务 5 21492
开始做任务 6 1508
任务结束任务结束
任务结束

over

callback参数的使用方法

import time
from multiprocessing import Pool
#非阻塞式
def task(task_name):
    print("开始做任务",task_name,os.getpid())
    time.sleep(3)
    print("结束")
    return (task_name)
    
container = []
def callback_func(n):			#回调函数必须要有形参,形参用来接收返回值
    container.append(n)

if __name__ =="__main__":
    pool = Pool(5)
    list =["1","2","3",'4','5','6']
    for i in list:
        pool.apply_async(task,args(i,),callback=callback_func)
    pool.close()
    pool.join()
    print(container)

回调方法(callback)可以把函数返回的值进行调用,不过需要自行设置一个回调函数。

阻塞式

特点:

添加一个,执行一个进程,如果进程不结束,后面的进程就进不了pool。

示例:

import os
import time
from multiprocessing import Pool

def task(task_name):
    print("开始做任务",task_name,os.getpid())
    time.sleep(1)
    print("结束")
    print(task_name)
if __name__=="__main__":
    pool = Pool(2)
    list = ["1", "2", "3", '4', '5', '6']
    for i in list:
        pool.apply(task, args=(i,))
    pool.close()
    pool.join()

阻塞式没有回调函数