python多进程运行

0Crush / 2023-08-11 / 原文

多进程并行的模块:multiprocessing

ultiprocessing包的详解需要涉及到以下几个方面:

  • 进程类(Process):这是multiprocessing包的核心类,它用于创建和管理一个进程。进程类接受一个target参数,指定要在进程中执行的函数,以及一个args参数,指定要传递给函数的参数。进程类还有一些方法和属性,如start()、join()、terminate()、pid等,用于控制和获取进程的状态。

1.1、run()
       表示进程运行的方法。可以在子类中重写此方法。标准run() 方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别使用args和kwargs参数中的顺序和关键字参数。

1.2、start()
       进程准备就绪,等待CPU调度。

1.3、join([ 超时] )
       如果可选参数timeout是None,则该方法将阻塞,直到join()调用其方法的进程终止。如果timeout是一个正数,它最多会阻塞超时秒。请注意,None如果方法的进程终止或方法超时,则返回该方法。检查进程exitcode以确定它是否终止。

1.4、name
       进程的名称。该名称是一个字符串,仅用于识别目的。

1.5、is_alive()
       返回进程是否存活。从start() 方法返回到子进程终止的那一刻,进程对象仍处于活动状态。

1.6、daemon
       进程的守护进程标志,一个布尔值。必须在start()调用之前设置,当进程退出时,它会尝试终止其所有守护进程子进程。因为主进程会等子进程运行完毕后结束,所以要设置子进程守护主进程。

1.7、pid
       返回进程ID。在产生该过程之前,这将是 None。

1.8、exitcode
       子进程的退出代码。None如果流程尚未终止,这将是负值-N表示该子进程被信号N终止。

  • 进程池类(Pool):这是multiprocessing包提供的一个便利类,它可以创建一个固定数量的进程池,用于执行一组任务。进程池类有一些方法,如map()、apply()、apply_async()等,用于向进程池提交任务,并获取返回值。进程池类还有一些属性,如max_workers、_pool等,用于设置和访问进程池的配置。
  • 队列类(Queue):这是multiprocessing包提供的一个重要类,它用于实现进程间的通信。队列类是一个先进先出(FIFO)的数据结构,它可以在多个进程之间安全地传递任何可pickle的对象。队列类有一些方法,如put()、get()、empty()等,用于向队列中添加或移除数据,并检查队列的状态。

 

  • 管道函数(Pipe):这是multiprocessing包提供的一个简单的通信机制,它返回一对连接对象,分别表示管道的两端。管道函数有一个duplex参数,用于指定管道是否为双向的。连接对象有一些方法,如send()、recv()、close()等,用于在管道中发送或接收数据,并关闭连接。
  • 值类(Value)和数组类(Array):这是multiprocessing包提供的两个类,它们用于在多个进程之间共享数据。值类和数组类都需要指定一个类型码和一个初始值,用于创建一个共享内存空间。值类和数组类都有一个value属性或者buffer属性,用于访问或修改共享数据。
使用Array对象和Value对象在进程之间共享数据:
import multiprocessing 

def square_list(mylist, result, square_sum): 
    # append squares of mylist to result array 
    for idx, num in enumerate(mylist): 
        result[idx] = num * num 

    # 创建一个value对象
    square_sum.value = sum(result) 

    # 打印结果数组 Array
    print("Result(in process p1): {}".format(result[:])) 

    # 打印 square_sum Value 
    print("Sum of squares(in process p1): {}".format(square_sum.value)) 

if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 

    # creating Array of int data type with space for 4 integers('i':int数据类型:整数) 
    # 在主程序中我们首先创建了一个Array对象:包含4个元素的数组
    result = multiprocessing.Array('i', 4) 

    # creating Value of int data type 
    # 创建了一个Value对象
    square_sum = multiprocessing.Value('i') 

    # creating new process 
    p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum)) 

    # starting process 
    p1.start() 

    # wait until process is finished 
    p1.join() 

    # print result array 
    print("Result(in main program): {}".format(result[:])) 

    # print square_sum Value 
    print("Sum of squares(in main program): {}".format(square_sum.value))
    #在函数中result元素通过索引进行数组赋值,square_sum通过value属性进行赋值。

# 注意:为了完整打印result数组的结果,需要使用result[:]进行打印,而square_sum也需要使用value属性进行打印:
  • 同步原语(Synchronization Primitives):这是multiprocessing包提供的一些类和函数,它们用于实现多个进程之间的同步和协调。同步原语包括锁(Lock)、信号量(Semaphore)、事件(Event)、条件变量(Condition)、屏障(Barrier)等。同步原语可以防止多个进程同时访问共享资源或者保证多个进程按照一定的顺序执行。