【问题标题】:multiprocessing/threading: data appending & output return多处理/线程:数据附加和输出返回
【发布时间】:2017-05-26 19:54:20
【问题描述】:

我在下面有一个名为 run 的冗长函数,其中包含一些附加数据的实例。

from multiprocessing import Process

data = []

def run():
    global data
    ...
    data.append(trace)
    ...

if __name__ == '__main__':
    jobs = []

    gen_count = 0
    leaked_count = 0
    system_count = 0

    N = 100

    for i in range(N):
        p = Process(target=run)
        jobs.append(p)
        p.start()

但是,使用多处理不会附加任何数据。此外,函数run 返回几个需要添加到gen_countleaked_countsystem_count 的值,我不知道如何检索这些值。我选择了多处理,因为在 for 循环中运行 run 很慢,并且每次迭代都独立于其余部分。我想稍后在此代码中加入 GPU 加速,以供任何对此有任何想法的人使用。

所以我的问题如下:

  1. 我什至应该使用多处理而不是线程?
  2. 为什么trace 没有附加到data
  3. 如何在多处理块中检索run 的输出?

编辑:

from plotly.offline import init_notebook_mode
import plotly.graph_objs as go
import plotly as py
import time
import Cross_Section_Loading
from multiprocess import Process, Pool, Queue, Manager, cpu_count
from functools import partial
import numpy as np
init_notebook_mode(connected=True)

...

def particle_func(x, y, z):

    leaked = 0
    nu = 0

    # get initial direction
    theta = np.random.uniform(0, np.pi, 1)
    phi = np.random.uniform(0, 2 * np.pi, 1)

    # compute energy via rejection sampling
    expfiss = lambda e: 0.453 * np.exp(-1.036 * e / 1.0e6) * np.sinh(np.sqrt(2.29 * e / 1.0e6))

    min_eng = np.min(E)
    max_eng = np.max(E)
    max_prob = np.max(expfiss(E))

    rejected = 1
    while rejected:
        a = np.random.uniform(min_eng, max_eng, 1)
        b = np.random.uniform(0, max_prob, 1)
        rel_prob = expfiss(a)
        if b <= rel_prob:
            energy = a
            rejected = 0

    alive = 1

    # vector to keep track of positions
    xvec = np.ones(1) * x
    yvec = np.ones(1) * y
    zvec = np.ones(1) * z

    while alive:
        # Get real/new cross-sections for corresponding energy
        index = energy_lookup(E, energy)

        interacted = 0
        total_distance = 0
        # Interacted may still be alive (scattering)
        while interacted == 0:

            ###################################################
            # Determine starting location for sample distance using sigma_total
            material_start = material_type(x, y)

            if material_start == 1:
                sig_tot = sigma_total_fuel(ENRICHMENT_1)[index]
            elif material_start == 2:
                sig_tot = sigma_total_fuel(ENRICHMENT_2)[index]
            elif material_start == 3:
                sig_tot = sigma_total_fuel(ENRICHMENT_3)[index]
            else:
                sig_tot = sigma_total_mod[index]

            ###################################################

            if material_start == 1 or material_start == 2 or material_start == 3:  # if in fuel pin

                # Get distance to edge of fuel rod (from fuel)
                d = distance_to_edge(x, y, phi)

                # get sample distance to collision
                s = -np.log(1.0 - np.random.random(1)) / sig_tot

                # Incidence on interface (denoted by code "no-interface")
                if d != 'no-interface':

                    # Sample distance is greater than interface distance (does not account for material change)
                    # Must convert between 2D and 3D
                    if s * np.sin(theta) > d:
                        total_distance += d / np.sin(theta)

                    # Sample distance is correct and interaction occurs
                    else:
                        total_distance += s
                        interacted = 1

                # Statement may be redundant but idk how to handle return from distance_to_rod
                else:
                    total_distance += s
                    interacted = 1

            else:               # if in moderator
                # get distance to edge of fuel rod (from moderator)
                d = distance_to_edge(x, y, phi)

                # get distance to collision
                s = -np.log(1.0 - np.random.random(1)) / sig_tot

                # Incidence on interface (denoted by code "no-interface")
                if d != 'no-interface':

                    # Sample distance is greater than interface distance (does not account for material change)
                    # Must convert between 2D and 3D
                    if s * np.sin(theta) > d:
                        total_distance += d / np.sin(theta)  # <- Right conversion?

                    # Sample distance is correct and interaction occurs
                    else:
                        total_distance += s
                        interacted = 1

                # Statement may be redundant but idk how to handle return from distance_to_rod
                else:
                    total_distance += s
                    interacted = 1

            # move particle
            z += total_distance * np.cos(theta)
            y += total_distance * np.sin(theta) * np.sin(phi)
            x += total_distance * np.sin(theta) * np.cos(phi)

        # material_end = material_type(x, y)
        #
        # if material_start != material_end:
        #     print("Neutron has crossed material interface(s)")

        # Trace/Track particle movement
        xvec = np.append(xvec, x)
        yvec = np.append(yvec, y)
        zvec = np.append(zvec, z)

        ###################################################

        # Leakage
        if x > X_BOUNDARY or x < -X_BOUNDARY:
            # Still need implementation
            leaked = 1
            alive = 0

        if y > Y_BOUNDARY or y < -Y_BOUNDARY:
            # Still need implementation
            leaked = 1
            alive = 0

        if z > HEIGHT or z < 0:
            # Still need implementation
            leaked = 1
            alive = 0

        ###################################################

        # Determine Type of interaction based on energy and corresponding cross-sections
        # In fuel
        material = material_type(x, y)
        if material == 1:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_1)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_1)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_1)[index]
            nu_temp = nu_fuel(ENRICHMENT_1)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        elif material == 2:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_2)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_2)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_2)[index]
            nu_temp = nu_fuel(ENRICHMENT_2)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        if material == 3:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_3)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_3)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_3)[index]
            nu_temp = nu_fuel(ENRICHMENT_3)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        # In water
        else:
            mod_scat = sigma_scatter_mod[index]
            mod_tot = sigma_total_mod[index]

            # scatter or absorb
            if np.random.random(1) < mod_scat / mod_tot:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_mod * energy, energy, 1)

            else:
                # absorbed
                alive = 0

        ###################################################

    return xvec, yvec, zvec, nu, leaked


##################################################################


def run(data_test):

    ###################################################

    # Uniformly Dispersed Source (Cylinder)
    # x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
    # y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
    # z = np.random.uniform(-HEIGHT, HEIGHT, 1)

    # Uniformly Dispersed FUEL Source (Cylinder)
    rejected = 1
    while rejected:
        x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
        y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
        z = np.random.uniform(-HEIGHT, HEIGHT, 1)
        if material_type(x, y):
            rejected = 0

    ###################################################

    # Get normal particle info (trace)
    x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
    leaked_count = leaked
    gen_count = nu
    system_count = (1 + nu - leaked)

    # particle_trace = go.Scatter3d(
    #     x=x_vec,
    #     y=y_vec,
    #     z=z_vec,
    #     mode='lines',
    #     line=dict(color='rgb(173, 255, 47)')
    # )
    #
    # data_test.append(particle_trace)

    data_test.append((x_vec, y_vec, z_vec))

    ###################################################

    nu_vec = [nu]
    x_vecs = [x_vec]
    y_vecs = [y_vec]
    z_vecs = [z_vec]

    if nu > 0:
        print("{} neutrons generated for neutron {}".format(nu, i))

    else:
        print("No neutrons generated for neutron {}".format(i + 1))

    t = 0
    recent_nus = nu_vec
    while np.any(recent_nus) != 0:

        print(nu_vec[-t:])

        tracker = 0

        nu_vec_temp = []

        x_vecs_temp = []
        y_vecs_temp = []
        z_vecs_temp = []

        for a in range(len(nu_vec[-t:])):

            x = x_vecs[-(a + 1)][-1]
            y = y_vecs[-(a + 1)][-1]
            z = z_vecs[-(a + 1)][-1]

            for j in range(nu_vec[-(a + 1)]):
                x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
                leaked_count += leaked

                print("Particle {} starting coords:".format(j + 1), x_vec[0], y_vec[0], z_vec[0])
                print("Particle {} ending coords:".format(j + 1), x_vec[-1], y_vec[-1], z_vec[-1])
                print("Particle {} nu value".format(j + 1), nu)

                nu_vec_temp.append(nu)
                tracker += 1

                x_vecs_temp.append(x_vec)
                y_vecs_temp.append(y_vec)
                z_vecs_temp.append(z_vec)

                # time.sleep(1)

                # particle_trace = go.Scatter3d(
                #     x=x_vec,
                #     y=y_vec,
                #     z=z_vec,
                #     mode='lines',
                #     line=dict(color='rgb(255, 0, 0)')
                # )

                # data_test.append(particle_trace)
                data_test.append((x_vec, y_vec, z_vec))

            print()
            t = tracker

        nu_vec.extend(nu_vec_temp)
        x_vecs.extend(x_vecs_temp)
        y_vecs.extend(y_vecs_temp)
        z_vecs.extend(z_vecs_temp)

        recent_nus = nu_vec_temp

        print("Continuing fission:", (np.any(recent_nus) != 0))

    return leaked_count, gen_count, system_count

##################################################################

if __name__ == '__main__':
    jobs = []

    manager = Manager()
    list_ = manager.list()
    for _ in range(cpu_count() - 1):
        p = Process(target=run, args=(list_,))
        jobs.append(p)
        p.start()
        p.join()
    while True:  # stops main thread from completing execution
        time.sleep(5)  # wait 5 second before checking if processes are terminated
        if all([not x.is_alive() for x in jobs]):  # check if all processes terminated
            break  # breaks the loop

# print("\nTotal number of neutrons in system:", system_count)
# print("Total number of neutrons generated from {} neutron source: {}".format(N, gen_count))
# print("System Multiplication factor:", gen_count/N)
# print("Total number of leaked neutrons:", leaked_count)


layout = go.Layout(
    title='Monte Carlo Assembly',
    autosize=True,
    showlegend=False,
    height=1000,
    width=1000,
    scene=dict(zaxis=dict(range=[-1, HEIGHT + 1]),
               yaxis=dict(range=[-(Y_DIM * PITCH + 5), (Y_DIM * PITCH + 5)]),
               xaxis=dict(range=[-(X_DIM * PITCH + 5), (X_DIM * PITCH + 5)])
               ),
)

fig = go.Figure(data=data, layout=layout)
py.offline.plot(fig, filename='file.html')

输出和错误信息:

/Users/sterlingbutters/anaconda/bin/python "/Users/sterlingbutters/PycharmProjects/Monte Carlo Simulation/MC Plotly (Cylindrical Assembly) Reflector.py"
For 17 x 17 assembly, 9 x 9 is needed. Your shape: (9, 9)
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
[(array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294]))]
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable

Process finished with exit code 0

【问题讨论】:

  • 您需要一个共享内存中的对象(目前,每个进程都有自己的数据结构副本),因此您可以使用Manager 等等,例如Queue。由于 GIL,线程几乎肯定不会加快速度。正如你所拥有的那样,产生一个进程也不会加快速度。要么事先分好锅,然后将块发送到不同的进程,要么将整个工作放到Pool
  • 你会如何在上面实现一个池?前段时间我做了一些尝试,但没有成功
  • 您的建议是什么:队列还是池?
  • 另外,我错过了很糟糕的for 循环。您正在生成 100 个进程,这几乎可以肯定比您的核心数量还要多。将其设置为合理的值(可能核心数 - 1)并相应地划分工作
  • 好吧,我的笔记本电脑有 4 个理论上的核心,所以如果我有一个庞大的列表,我必须以同样的方式处理其成员,我会将该列表分成 3 个,生成 3 个进程并发送这些中的每一个都处理他们自己的,独特的,该列表的一部分。然后重新组合结果。

标签: python return append python-multithreading python-multiprocessing


【解决方案1】:

多处理生成一个不同的进程,它拥有从当前环境复制的全局变量。在该流程中对变量所做的所有更改都不会反映在父流程中。你需要在进程之间共享内存,而共享内存中的变量可以进行交换。

您可以使用multiprocessing.Manager 创建一个共享对象,如列表或字典,并操作该对象。

进程被分配给处理器的不同内核/线程。如果您有 4 核/8 线程系统,则最多生成 7 个进程以最大限度地提高性能,除此之外,某些进程会干扰其他进程,并且会减慢/减少分配给您的操作系统的 CPU 时间,这可能会使您的系统崩溃.始终是 cpu 核心/cpu 线程 -1 进程用于稳定处理,至少有一个核心留给 os 来处理其他操作。

你可以像这样修改你的代码

from multiprocessing import Process, Manager
import time

def run(list_):
    list_.append(trace)

if __name__ == "__main__":
    jobs = []
    gen_count = 0
    leaked_count = 0
    system_count = 0

    with Manager() as manager:
        list_ = manager.list()
        for _ in range(multiprocessing.cpu_count()-1):
            p = Process(target=run,args=(list_))
            jobs.append(p)
            p.start()
        while True: #stops main thread from completing execution
            time.sleep(5) #wait 5 second before checking if processes are terminated
            if all([not x.is_alive() for x in jobs]): #check if all processes terminated
                break #breaks the loop 

【讨论】:

  • 感谢您的回答,请查看问题编辑,对问题可能是什么有任何想法?
  • @SterlingButters 您需要等到所有进程都完成执行。我想问题是您的主要代码在所有进程产生之前完成了执行。请参阅此stackoverflow.com/questions/25455462/… 了解更多信息
  • 请看新的错误信息,你认为现在的问题是什么?
  • 我不明白问题出在哪里,你的论点是否可以挑剔?因为那是错误所在的部分
  • 除了用于管理器的 list_ 之外,“运行”没有其他参数。运行 print(dill.pickles(list_)) 给出 True 所以我的猜测是肯定的。
【解决方案2】:

多处理的工作方式是,每个子任务在自己的内存空间中运行,并获取自己的任何全局变量副本。绕过此限制以有效共享数据的常见方法是使用multiprocessing.Manager 来协调对其的并发访问并透明地防止可能导致的任何问题。

以下是根据您的示例代码执行此操作的示例。它还使用multiprocessing.Pool(),这使得创建一个固定大小的流程对象集合变得容易,每个流程对象都可以提供来自每个子任务的异步结果(或者等到它们全部完成后再检索它们,就像这里所做的那样)。

from functools import partial
import multiprocessing

def run(data, i):
    data.append('trace%d' % i)
    return 1, 2, 3  # values to add to gen_count, leaked_count, and system_count

if __name__ == '__main__':
    N = 10
    manager = multiprocessing.Manager()  # create SyncManager
    data = manager.list()  # create a shared list
    pool = multiprocessing.Pool()

    async_result = pool.map_async(partial(run, data), range(N))
    values = tuple(zip(*async_result.get()))
    gen_count = sum(values[0])
    leaked_count = sum(values[1])
    system_count = sum(values[2])

    print(data)
    print('Totals:  gen_count {}, leaked_count {}, system_count {}'.format(
            gen_count, leaked_count, system_count))

输出:

['trace0', 'trace1', 'trace2', 'trace4', 'trace3', 'trace5', 'trace8', 'trace6', 'trace7', 'trace9']
Totals:  gen_count 10, leaked_count 20, system_count 30

【讨论】:

  • 恐怕这不起作用,也许是因为这里列出的相同原因(错误):github.com/jonathanslenders/ptpython/issues/193 这个问题只有 20 天了
  • @SterlingButters:听起来 ptpython 有泡菜问题——这对于进行多处理非常不利(这取决于它)。您链接中的代码对我来说看起来不错。
  • 在我的代码中使用你的算法正好给了我一个 async_result 的空列表...我不知道为什么
  • @SterlingButters:您必须致电async_result.get() 来检索列表。
  • 使用 async_result.get() 最终返回 TypeError: 'NoneType' object is not callable;使用 async_result.get 返回 >
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-01-08
  • 2012-03-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多