【问题标题】:How to parallelize this situation with robots如何将这种情况与机器人并行化
【发布时间】:2011-04-21 23:04:51
【问题描述】:

我正在研究一个机器人问题。情况是这样的:

  1. 最初有 N 个机器人(通常 N>100 个)都处于静止状态。
  2. 每个机器人都会吸引在其半径r 内的所有其他机器人。
  3. 我有一组方程式,我可以用这些方程式计算加速度、速度以及机器人在时间 deltat 之后的位置。简单来说就是可以在deltat时间之后找到每个机器人的位置。
  4. 我需要做的就是为给定的 deltat。我需要为每个 deltat 显示每个机器人的位置。

问题其实很简单。算法将类似于:

del_t = ;its given
initialPositions = ;its given
num_robots = ;its given

以下代码为每个del_t 执行

robots = range(1,no_robots)
for R in robots:
    for r in robots:
        if  distanceBetween(r,R) <= radius and r is not R:
            acceleration_along_X[R] += xAcceleration( position(r), position(R) )
            acceleration_along_Y[R] += yAcceleration( position(r), position(R) )
    currVelocity_along_X[R] = prevVelocity_along_X[R] + acceleration_along_X[R] * del_t
    currVelocity_along_Y[R] = prevVelocity_along_Y[R] + acceleration_along_Y[R] * del_t
    curr_X_coordinate[R] = prev_X_coordinate[R] + currVelocity_along_X[R] * del_t
    curr_Y_coordinate[R] = prev_Y_coordinate[R] + currVelocity_along_Y[R] * del_t
    print 'Position of robot ' + str(R) + ' is (' + curr_X_coordinate[R] + ', ' + curr_Y_coordinate[R] +' ) \n'
    prev_X_coordinate[R] = curr_X_coordinate[R]
    prev_Y_coordinate[R] = curr_Y_coordinate[R]
    prevVelocity_along_X[R] = currVelocity_along_X[R]
    prevVelocity_along_Y[R] = currVelocity_along_Y[R]

现在我需要并行化算法并设置 MPI 进程的笛卡尔网格。

  1. 因为每个机器人的计算都是一项独立的任务。每个机器人的计算 可以由一个独立的线程来完成。对吗?
  2. 我对 MPI 一无所知。这个“MPI 过程的笛卡尔网格”是什么意思?我该如何设置这个网格?我对此一无所知。

编辑:

现在问题变得有趣了。其实,事情并没有我想的那么简单。看完Unode's answer。我继续通过使用多处理进行并行化来应用他的方法二。

这是代码。 printPositionOfRobot 是我的串行算法。基本上,它应该打印机器人的位置(id robot_id)t=1,2,3,4,5,6,7,8,9,10。 (这里 del_t 取为 1。num_iterations = 10。每个机器人打印如下消息:Robot8 : Position at t = 9 is (21.1051065245, -53.8757356694 )

此代码中存在错误。 t=0 机器人的位置由position() 给出,用于确定 xAcceleration 和 yAcceleration。我们需要使用所有其他粒子先前迭代的位置。

from multiprocessing import Pool
import math


def printPositionOfRobot(robot_id):
    radius = 3
    del_t = 1
    num_iterations = 10
    no_robots = 10

    prevVelocity_along_X = 0
    prevVelocity_along_Y = 0
    acceleration_along_X = 0
    acceleration_along_Y = 0
    (prev_X_coordinate,prev_Y_coordinate) = position(robot_id)#!!it should call initialPosition()
    for i in range(1,num_iterations+1):
        for r in range(no_robots):
            if  distanceBetween(r,robot_id) <= radius and r is not robot_id:
                acceleration_along_X += xAcceleration( position(r), position(robot_id) ) #!! Problem !!
                acceleration_along_Y += yAcceleration( position(r), position(robot_id) )#!! Problem !!
        currVelocity_along_X = prevVelocity_along_X + acceleration_along_X * del_t
        currVelocity_along_Y = prevVelocity_along_Y + acceleration_along_Y * del_t
        curr_X_coordinate = prev_X_coordinate + currVelocity_along_X * del_t
        curr_Y_coordinate = prev_Y_coordinate + currVelocity_along_Y * del_t
        print 'Robot' + str(robot_id) + ' : Position at t = '+ str(i*del_t) +' is (' + str(curr_X_coordinate) + ', ' + str(curr_Y_coordinate) +' ) \n'
        prev_X_coordinate = curr_X_coordinate
        prev_Y_coordinate = curr_Y_coordinate
        prevVelocity_along_X = currVelocity_along_X
        prevVelocity_along_Y = currVelocity_along_Y

def xAcceleration((x1,y1),(x2,y2)):
    s = distance((x1,y1),(x2,y2))
    return 12*(x2-x1)*( pow(s,-15) - pow(s,-7) + 0.00548*s )

def yAcceleration((x1,y1),(x2,y2)):
    s = distance((x1,y1),(x2,y2))
    return 12*(y2-y1)*( pow(s,-15) - pow(s,-7) + 0.00548*s )

def distanceBetween(r,robot_id):
    return distance(position(r), position(robot_id))

def distance((x1,y1),(x2,y2)):
    return math.sqrt( (x2-x1)**2 + (y2-y1)**2 )

def Position(r): #!!name of this function should be initialPosition
    k = [(-8.750000,6.495191) , (-7.500000,8.660254) , (-10.000000,0.000000) , (-8.750000,2.165064) , (-7.500000,4.330127) , (-6.250000,6.495191) , (-5.000000,8.660254) , (-10.000000,-4.330127) , (-8.750000,-2.165064) , (-7.500000,0.000000) ]
    return k[r]

if __name__ == "__main__":
    no_robots = 10  # Number of robots you need
    p = Pool(no_robots)  # Spawn a pool of processes (one per robot in this case)
    p.map(printPositionOfRobot, range(no_robots))

acceleration_along_Xacceleration_along_Y 中的 position 函数应该返回机器人的最新位置。最新是指上一次迭代结束时的位置。因此,每个进程都必须通知其他进程它的最新位置。在知道机器人的最新位置之前,该过程必须等待。

其他方式可以是所有进程都编辑一个全局位置。(我想知道它是否可能,因为每个进程都有自己的虚拟地址空间)。如果一个进程尚未达到该迭代,则所有其他进程都必须等待。

关于如何去做的任何想法?我想这就是为什么在问题中建议使用 MPI 的原因。

【问题讨论】:

  • MPI = 消息传递接口 (?) This document 提供了一些关于 MPI 和设置并行系统的不同方法的体面信息。 (不过,这是一个 Word 文档。)
  • 如果你一直坚持使用 MPI 来做这件事,我最近做了一个关于 N 体问题的简单 MPI 方法的课程(这就是你在这里试图模拟的,尽管在不同的上下文中)。幻灯片和视频在这里:cita.utoronto.ca/~ljdursi/PSP/Nbody-MPI/Nbody-MPI.html 这是一个棘手的并行化问题,因为每个机器人都需要来自其他机器人的信息。 (模拟这类事情的实际问题使用近似值,基于这样一个事实,即更远的物体越来越不重要,因此不需要精确处理;但这可能对您没有帮助)

标签: python parallel-processing mpi


【解决方案1】:

注意:Python 的 threads 仍然在同一个处理器上运行。如果你想使用你机器的所有处理器,你应该使用multiprocessing (python2.6+)。

只有在计算分布在多台计算机上时,使用 MPI 才能为您带来明显的好处。

有两种方法可以解决您的问题。由于您拥有完全独立的进程,您可以根据需要多次启动算法(为每个机器人传递唯一标识符),并让操作系统处理并发。

1 - 一个简短的 Linux shell 脚本(或 Windows BATCH 语言中的等效脚本):

#!/bin/sh
for i in {0..99}; do
    echo "Running $i"
    python launch.py $i &
done

注意:launch.py​​ 之后的 &amp; 这确保您实际上以连续方式启动所有进程,而不是等待一个完成然后启动下一个。

2 - 如果您想在 python 中完成这一切,您可以使用以下简单的并行化方法:

from multiprocessing import Pool

def your_algorithm(robot_id):
    print(robot_id)

if __name__ == "__main__":
    robots = 100  # Number of robots you need
    p = Pool(robots)  # Spawn a pool of processes (one per robot in this case)
    p.map(your_algorithm, range(robots))

map 函数负责为每个机器人分派一个独立的操作。

如果您确实需要使用 MPI,我建议 mpi4py

关于Cartesian grid代表什么的信息,试试this

【讨论】:

  • There are two approaches to your problem. :你提到了一个。另一个是?什么是“设置 MPI 过程的笛卡尔网格”。在我的问题陈述中是什么意思?这是否意味着“只需使用 MPI 编写此程序”。你能指点我一些相关的资源吗?
  • 很抱歉我没有包含 MPI,因为您的任务可以在不使用它的情况下完成。现已添加。
  • 我按照你的方法2。我不知道为什么,但它不起作用。什么都没有打印出来。 p = Pool(robots) 正在创建进程。之后就什么都没有了。
  • 您能详细说明一下吗?它在这里工作正常。您不应该在交互式 python 会话中尝试此操作。将代码保存为文件并使用 python script_name.py 运行它
  • 是的,它是空闲的。从终端它运行良好。非常感谢。现在,我需要研究 MPI 的东西。非常感谢您的帮助。我现在将开始阅读有关笛卡尔网格和 MPI 的内容。如果您添加有关 MPI 的任何附加信息,我会更高兴。因为我必须使用它,而且我绝对是它的菜鸟。
【解决方案2】:

所以这里的诀窍是,在每一步,所有机器人都必须在某个时间点查看所有其他机器人的数据。这使得高效的并行化变得困难!

一种简单的方法是让每个进程在自己的机器人上运行,首先计算自交互,然后从其他处理器一个接一个地获取数据并计算这些力。请注意,这不是唯一的方法!此外,此类事物(分子动力学或大多数天体 N 体模拟)的现实世界求解器完全采用不同的策略,仅近似处理远处的物体,因为远处的物体不如近处的物体重要。

下面是该方法的简单 python 实现,它使用两个 mpi 进程(使用 mpi4py 和 matplotlib/pylab 进行绘图)。对此的概括将是一个管道;每个处理器将其数据块发送到下一个邻居,进行强制计算,然后重复此过程,直到每个人都看到每个人的数据。理想情况下,您会随着管道的进展更新绘图,这样就不必一次将所有数据都保存在内存中。

你可以用 mpirun -np 2 ./robots.py 运行这个程序;请注意,您需要安装 MPI 库,然后 mpi4py 需要知道在哪里可以找到这些库。

还要注意,我将所有机器人数据发送给邻居非常浪费;邻居关心的只是职位。

#!/usr/bin/env python
import numpy
import pylab
from mpi4py import MPI

class Robot(object):
    def __init__(self, id, x, y, vx, vy, mass):
        self.id = id
        self.x = x
        self.y = y
        self.vx = vx
        self.vy = vy
        self.ax = 0.
        self.ay = 0.
        self.mass = mass
    def rPrint(self):
        print "Robot ",self.id," at (",self.x,",",self.y,")"
    def interact(self, robot2):
        dx = (self.x-robot2.x)
        dy = (self.y-robot2.y)
        eps = 0.25
        idist3 = numpy.power(numpy.sqrt(dx*dx +dy*dy + eps*eps),-3)
        numerator = -self.mass*robot2.mass
        self.ax += numerator*dx*idist3
        self.ay += numerator*dy*idist3
        robot2.ax -= numerator*dx*idist3
        robot2.ay -= numerator*dy*idist3
    def updatePos(self, dt):
        self.x += 0.5*self.vx*dt
        self.y += 0.5*self.vy*dt
        self.vx += self.ax*dt
        self.vy += self.ay*dt
        self.x += 0.5*self.vx*dt
        self.y += 0.5*self.vy*dt
        self.ax = 0.
        self.ay = 0.



def init(nRobots):
    myRobotList = []
    vx = 0.
    vy = 0.
    mass = 1.
    for i in range(nRobots):
        randpos = numpy.random.uniform(-3,+3,2)
        rx = randpos[0]
        ry = randpos[1]
        myRobotList.append(Robot(i, rx, ry, vx, vy, mass))
    return myRobotList

def selfForces(robotList):
    nRobots = len(robotList)
    for i in range(nRobots-1): 
       for j in range (i+1, nRobots):
            robotList[i].interact(robotList[j])

def otherRobotForces(myRobotList, otherRobotList):
    for i in myRobotList:
        for j in otherRobotList:
            i.interact(j)

def plotRobots(robotList):
    xl = []
    yl = []
    vxl = []
    vyl = [] 
    for i in robotList:
       xl.append(i.x)
       yl.append(i.y)
       vxl.append(i.vx)
       vyl.append(i.vy)
    pylab.subplot(1,1,1)
    pylab.plot(xl,yl,'o')
    pylab.quiver(xl,yl,vxl,vyl)
    pylab.show()

if __name__ == "__main__":
    comm = MPI.COMM_WORLD
    nprocs = comm.Get_size()
    rank   = comm.Get_rank()

    if (nprocs != 2):
        print "Only doing this for 2 for now.."
        sys.exit(-1)
    neigh = (rank + 1) %  nprocs

    robotList = init(50)

    for i in range (10):
        print "[",rank,"] Doing step ", i
        selfForces(robotList)

        request = comm.isend(robotList, dest=neigh, tag=11)
        otherRobotList = comm.recv(source=neigh, tag=11)

        otherRobotForces(robotList,otherRobotList)

        request.Wait()

        for r in robotList:
            r.updatePos(0.05)



    if (rank == 0):
        print "plotting Robots"
        plotRobots(robotList + otherRobotList)

【讨论】:

  • 你能推荐一些很好的资源来解释使用 python (mpi4py) 的 MPI。使用 python MPI 看起来很简单:)
  • mpi4py 肯定比在 C 或 FORTRAN 中更简单......不幸的是,mpi4py 文档假定您了解 MPI。我建议通过一个简短的 MPI 入门教程(那里有很多;我的一个可以在 cita.utoronto.ca/~ljdursi/PSP/Intro-MPI-PartI/…cita.utoronto.ca/~ljdursi/PSP/Homework-IntroMPI-Answers/… 找到)。对于 mpi4py,一个快速的总结是 comm = MPI.COMM_WORLD; np=comm.Get_size() 和 p=comm.Get_rank() 获取该列表中的处理器总数和您的进程#(从 0..np-1)
  • comm.send(, dest=#, tag=#) 将数据发送到带有 rank# dest 和任意标签的进程(但发送方 + 接收方必须同意该标签)。 =comm.recv(source=#, tag=#) 一旦发送数据,就会从源接收数据。发送和接收块直到完成;要开始发送并执行其他操作,请执行 request=comm.isend(..) ,然后执行 request.Wait() 阻塞直到完成。 comm.Allreduce(loc, globsum, op=SUM) 将所有 loc 全局求和为 globsum;也有最小值和最大值。这足以让您入门。
【解决方案3】:

我的解决方案也类似于 Unode,但我更喜欢在 multiprocessing 中使用 apply_async 方法,因为它是异步的。

from multiprocessing import Pool

    def main():
        po = Pool(100) #subprocesses
        po.apply_async(function_to_execute, (function_params,), callback=after_processing)
        po.close() #close all processes
        po.join() #join the output of all processes
        return    

【讨论】:

  • 我没有包含该选项,因为它似乎不相关。脚本、退出状态和运行时间都没有提及连续性。而且由于 Pool 的大小与机器人的数量相同,所以应该没有关系。
猜你喜欢
  • 1970-01-01
  • 2021-12-11
  • 1970-01-01
  • 1970-01-01
  • 2013-02-06
  • 1970-01-01
  • 1970-01-01
  • 2018-09-22
  • 2016-07-27
相关资源
最近更新 更多