python-使用multiprocessing.starmap()在进程之间共...

codeday· 2019-10-23
本文来自 codeday ,作者 codeday
我想使用multiprocessing.Value multiprocessing.Lock在单独的进程之间共享一个计数器.例如:

import itertools as it
import multiprocessing

def func(x, val, lock):
    for i in range(x):
        i ** 2
    with lock:
        val.value += 1
        print('counter incremented to:', val.value)

if __name__ == '__main__':
    v = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    with multiprocessing.Pool() as pool:
        pool.starmap(func, ((i, v, lock) for i in range(25)))
    print(counter.value())

这将引发以下异常:

RuntimeError: Synchronized objects should only be shared between
processes through inheritance

我最困惑的是,一个相关的(虽然不是完全相似的)模式可以与multiprocessing.Process()一起使用:

if __name__ == '__main__':
    v = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    procs = [multiprocessing.Process(target=func, args=(i, v, lock))
             for i in range(25)]
    for p in procs: p.start()
    for p in procs: p.join()

现在,我认识到这是两个明显不同的事物:

>第一个示例使用了多个等于cpu_count()的工作进程,并在它们之间分割了一个可迭代范围(25)
>第二个示例创建25个工作流程和任务,每个工作流程和任务只有一个输入

就是说:如何以这种方式与pool.starmap()(或pool.map())共享实例?

我已经看到过类似的问题hereherehere,但是这些方法似乎不适用于.map()/.starmap(),因为Value是否使用ctypes.c_int都是如此.

我意识到这种方法在技术上是可行的:

def func(x):
    for i in range(x):
        i ** 2
    with lock:
        v.value += 1
        print('counter incremented to:', v.value)

v = None
lock = None

def set_global_counter_and_lock():
    """Egh ... """
    global v, lock
    if not any((v, lock)):
        v = multiprocessing.Value('i', 0)
        lock = multiprocessing.Lock()

if __name__ == '__main__':
    # Each worker process will call `initializer()` when it starts.
    with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
        pool.map(func, range(25))

这真的是最佳做法吗?

最佳答案
使用Pool时遇到的RuntimeError是因为在通过(内部池)队列发送到工作进程之前,已对池方法的参数进行了腌制.
您要使用哪种池方法与此处无关.当您仅使用Process时,不会发生这种情况,因为不涉及队列.您可以仅使用pickle.dumps(multiprocessing.Value(‘i’,0))重现该错误.

您上一个代码段不符合您的想法.您没有共享价值,而是为每个子进程重新创建了独立的计数器.

如果您在Unix上并使用默认的启动方法“ fork”,则只需不将共享对象作为参数传递给池方法即可.
您的子进程将通过派生继承全局变量.使用process-start-methods“ spawn”(默认Windows)或“ forkserver”时,您必须在Pool期间使用初始化程序
实例化,以使子进程继承共享对象.

请注意,您不需要额外的multiprocessing.Lock锁定在这里,因为multiprocessing.Value默认情况下带有内部可使用的值.

import os
from multiprocessing import Pool, Value #, set_start_method


def func(x):
    for i in range(x):
        assert i == i
        with cnt.get_lock():
            cnt.value += 1
            print(f'{os.getpid()} | counter incremented to: {cnt.value}
')


def init_globals(counter):
    global cnt
    cnt = counter


if __name__ == '__main__':

    # set_start_method('spawn')

    cnt = Value('i', 0)
    iterable = [10000 for _ in range(10)]

    with Pool(initializer=init_globals, initargs=(cnt,)) as pool:
        pool.map(func, iterable)

    assert cnt.value == 100000