【发布时间】:2014-10-08 13:56:34
【问题描述】:
我有一个需要并行化的代码。代码本身没有问题。 该代码是python类的方法。例如,
class test:
def __init__(self):
<...>
def method(self):
<...>
我这样写是因为FULL CODE 的细节可能不相关而且很长。 一开始我尝试并行化这段代码(只有两个实例):
t1=test()
t2=test()
pr1=Process(target=t1.method, args=(,))
pr2=Process(target=t2.method, args=(,))
pr1.start()
pr2.start()
pr1.join()
pr2.join()
但这不起作用。它不仅运行速度比依次运行一个实例然后另一个实例要慢得多,而且还存在未修改类变量的问题。最后一个问题得到了解决,感谢@MattDMo 在this thread 中的友好回答,通过创建共享命名空间、共享变量和共享列表:
import multiprocessing as mp
<...>
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])
self.shared.V=V
但它仍然运行得很慢。
一开始我以为因为我在有两个核心的笔记本电脑上执行代码,所以两个核心会饱和,但两个实例和计算机会变慢,因为无法快速执行任何其他任务。所以我决定在一台 6 核的台式机(也是一个 linux 系统)中尝试代码。它不能解决问题。并行化版本仍然慢得多。另一方面,台式计算机的 CPU 不会像我使用多线程执行 C 编译代码时那样发热。 有人知道发生了什么吗?
完整代码是here,包括在下面:
from math import exp
from pylab import *
from scipy.stats import norm
from scipy.integrate import ode
from random import gauss,random
from numpy import dot,fft
from time import time
import multiprocessing as mp
from multiprocessing import Pool
from multiprocessing import Process
from multiprocessing import Queue, Pipe
from multiprocessing import Lock, current_process
#Global variables
sec_steps=1000 #resolution (steps per second)
DT=1/float(sec_steps)
stdd=20 #standard deviation for retina random input
stdd2=20 #standard deviation for sigmoid
#FUNCTION TO APPROXIMATE NORMAL CUMULATIVE DISTRIBUTION FUNCTION
def sigmoid(x,mu,sigma):
beta1=-0.0004406
beta2=0.0418198
beta3=0.9
z=(x-mu)/sigma
if z>8:
return 1
elif z<-8:
return 0
else:
return 1/(1+exp(-sqrt(pi)*(beta1*z**5+beta2*z**3+beta3*z)))
#CLASSES
class retina: ##GAUSSIAN WHITE NOISE GENERATOR
def __init__(self,mu,sigma):
self.mu=mu
self.sigma=sigma
def create_pulse(self):
def pulse():
return gauss(self.mu,self.sigma)
#return uniform(-1,1)*sqrt(3.)*self.sigma+self.mu
return pulse
def test_white_noise(self,N): #test frequency spectrum of random number generator for N seconds
noise=[]
pulse=self.create_pulse()
steps=sec_steps*N+1
t=linspace(0,N,steps)
for i in t:
noise.append(pulse())
X=fft(noise)
X=[abs(x)/(steps/2.0) for x in X]
xlim([0,steps/N])
xlabel("freq. (Hz)")
ylabel("Ampl. (V)")
plot((t*steps/N**2)[1:],X[1:],color='black')
#savefig('./wnoise.eps', format='eps', dpi=1000)
show()
return noise
class cleft: #object: parent class for a synaptic cleft
def __init__(self):
self.shared=manager.Namespace()
self.shared.preV=0.0 #pre-synaptic voltage
self.shared.r=0.0 #proportion of channels opened
Tmax=1.0 #mM
mu=-35.0 #mV
sigma=stdd2 #mV
def T(self): #Receives presynaptic Voltage preV, returns concentration of the corresponding neurotransmitter.
return self.Tmax*sigmoid(self.shared.preV,self.mu,self.sigma)
def r_next(self): #Solves kinematic ode -analytical solution- to find r after one time step DT (needs T and alfa and beta parameters)
"""
runs the ode for one unit of time dt, as specified
updates the previous r taken as initial condition
"""
tau=1.0/(self.alfa*self.T()+self.beta)
r_inf=self.alfa*self.T()*tau
self.shared.r=r_inf+(self.shared.r-r_inf)*exp(-DT/tau)
def DI(self,postV): #Receives PSP and computes resulting change in PSC
return self.g*self.shared.r*(postV-self.restV)
class ampa_cleft(cleft): #Child class for ampa synaptic connection
def __init__(self):
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.shared.preV=0.0
self.shared.r=0.5 #initial condition for r
self.alfa=2.0
self.beta=0.1
self.restV=0.0
self.g=0.1
class gaba_a_cleft(cleft): #Child class for GABAa synaptic connection
def __init__(self):
self.shared=manager.Namespace()
self.shared.preV=0.0
self.shared.r=0.5
self.alfa=2.0
self.beta=0.08
self.restV=-75.0
self.g=0.2
class gaba_a_cleft_trnTOtrn(cleft): #Child class for GABAa synaptic connection
def __init__(self):
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.shared.preV=0.0
self.shared.r=0.5
self.alfa=2.0
self.beta=0.08
self.restV=-75.0
self.g=0.2
class gaba_a_cleft_inTOin(cleft): #Child class for GABAa synaptic connection
def __init__(self):
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.shared.preV=0.0
self.shared.r=0.5
self.alfa=2.0
self.beta=0.08
self.restV=-75.0
self.g=0.2
class gaba_a_cleft_trnTOtcr(cleft): #Child class for GABAa synaptic connection
def __init__(self):
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.shared.preV=0.0
self.shared.r=0.5
self.alfa=2.0
self.beta=0.08
self.restV=-85.0
self.g=0.1
class gaba_a_cleft_inTOtcr(cleft): #Child class for GABAa synaptic connection
def __init__(self):
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.shared.preV=0.0
self.shared.r=0.5
self.alfa=2.0
self.beta=0.08
self.restV=-85.0
self.g=0.1
class gaba_b_cleft(cleft): #Child class for GABAa synaptic connection
def __init__(self):
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.shared.preV=0.0
self.shared.r=0.5
self.shared.R=0.5
self.shared.X=0.5
self.alfa_1=0.02
self.alfa_2=0.03
self.beta_1=0.05
self.beta_2=0.01
self.restV=-100.0
self.g=0.06
self.n=4
self.Kd=100 #Dissociation constant
def r_next(self): #Solves kinematic ode SECOND MESSENGER -analytical solution- to find r after one time step DT (needs T and alfa and beta parameters)
"""
runs the ode for one unit of time dt, as specified
updates the previous r taken as initial condition
"""
Q1=self.alfa_1*self.T()
Q2=-Q1-self.beta_1
R0=self.shared.R
X0=self.shared.X
self.shared.R=(Q1*(exp(Q2*DT)-1)+Q2*R0*exp(Q2*DT))/Q2
self.shared.X=(exp(-self.beta_2*DT)*(self.alfa_2*(self.beta_2*(exp(DT*(self.beta_2+Q2))*(Q1+Q2*R0)+Q1*(-exp(self.beta_2*DT))-Q2*R0)-Q1*Q2*(exp(self.beta_2*DT)-1))+self.beta_2*Q2*X0*(self.beta_2+Q2)))/(self.beta_2*Q2*(self.beta_2+Q2))
self.shared.r=self.shared.X**self.n/(self.shared.X**self.n+self.Kd)
#######################################################################################################################################################
class neuronEnsemble:
def __init__(self,V): #Parent class for a Neuron ensemble
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0]) #Variables to store changes in PSC produced by synaptic connection
self.shared.V=V #Actual state of the membrane potential
kappa=1.0 #conductance
def V_next(self): #ode analitycally for a single time step DT
K1=self.C[0]*self.g/self.kappa
K2=(-dot(self.C,self.I)+self.C[0]*self.g*self.restV)/self.kappa
self.shared.V=K2/K1+(self.shared.V-K2/K1)*exp(-K1*DT)
class TCR_neuronEnsemble(neuronEnsemble):
def __init__(self,V):
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0]) #Variables to store changes in PSC produced by synaptic connection
self.shared.V=V #Actual state of the membrane potential
self.g=0.01 #conductance of leak
self.restV=-55.0 #rest of leak
self.C=(1.0,7.1,1.0/2.0*30.9/4.0,1.0/2.0*3.0*30.9/4.0,1.0/2.0*30.9) #Cleak,C2,C3,C4,C7!! #connectivity constants to the ensemble
#First one is Cleak, the others in same order as in diagram
class TRN_neuronEnsemble(neuronEnsemble):
def __init__(self,V):
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0]) #Variables to store changes in PSC produced by synaptic connection
self.shared.V=V #Actual state of the membrane potential
self.g=0.01 #conductance of leak
self.restV=-72.5 #rest of leak
self.C=(1.0,15.0,35.0,0.0,0.0) #Cleak,C5,C8 #connectivity constants to the ensemble
#First one is Cleak, the others in same order as in diagram
class IN_neuronEnsemble(neuronEnsemble): #!!! update all parameters !!!
def __init__(self,V):
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0]) #Variables to store changes in PSC produced by synaptic connection
self.shared.V=V #Actual state of the membrane potential
self.g=0.01 #conductance of leak
self.restV=-70.0 #rest of leak
self.C=(1.0,47.4,23.6,0.0,0.0) #Cleak,C1,C6!! #connectivity constants to the ensemble
#First one is Cleak, the others in same order as in diagram
######################################INSTANCE GROUP#################################################################
class group:
def __init__(self,tcr_V0,trn_V0,in_V0):
#Declarations of instances
####################
#SYNAPTIC CLEFTS
self.cleft_ret_in=ampa_cleft() #cleft between retina and IN ensemble
self.cleft_ret_tcr=ampa_cleft() #cleft between retina and TCR ensemble
self.cleft_in_in=gaba_a_cleft_inTOin() #cleft between IN and IN ensembles
self.cleft_in_tcr=gaba_a_cleft_inTOtcr() #cleft between IN and TCR ensembles
self.cleft_tcr_trn=ampa_cleft() #cleft between TCR and TRN ensembles
self.cleft_trn_trn=gaba_a_cleft_trnTOtrn() #cleft between TRN and TRN ensembles
self.cleft_trn_tcr_a=gaba_a_cleft_trnTOtcr() #cleft between TRN and TCR ensembles GABAa
self.cleft_trn_tcr_b=gaba_b_cleft() #cleft between TRN and TCR ensembles GABAb
#POPULATIONS
self.in_V0=in_V0 #mV i.c excitatory potential
self.IN=IN_neuronEnsemble(self.in_V0) #create instance of IN ensemble
self.tcr_V0=tcr_V0 #mV i.c excitatory potential
self.TCR=TCR_neuronEnsemble(self.tcr_V0) #create instance of TCR ensemble
self.trn_V0=trn_V0 #mV i.c inhibitory potential
self.TRN=TRN_neuronEnsemble(self.trn_V0) #create instance of TCR ensemble
def step(self,p): #makes a step of the circuit for the given instance
#UPDATE TRN
self.cleft_tcr_trn.shared.preV=self.TCR.shared.V #cleft takes presynaptic V
self.cleft_tcr_trn.r_next() #cleft updates r
self.TRN.I[2]=self.cleft_tcr_trn.DI(self.TRN.shared.V) #update PSC TCR--->TRN
self.cleft_trn_trn.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
self.cleft_trn_trn.r_next() #cleft updates r
self.TRN.I[1]=self.cleft_trn_trn.DI(self.TRN.shared.V) #update PSC TRN--->TRN
self.TRN.V_next() #update PSP in TRN
#record retinal pulse ------|> IN AND TCR
self.cleft_ret_in.shared.preV=self.cleft_ret_tcr.shared.preV=p
#UPDATE TCR
self.cleft_ret_tcr.r_next() #cleft updates r
self.TCR.I[1]=self.cleft_ret_tcr.DI(self.TCR.shared.V) #update PSC RET---|> TCR
self.cleft_trn_tcr_b.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
self.cleft_trn_tcr_b.r_next() #cleft updates r
self.TCR.I[2]=self.cleft_trn_tcr_b.DI(self.TCR.shared.V) #update PSC
self.cleft_trn_tcr_a.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
self.cleft_trn_tcr_a.r_next() #cleft updates r
self.TCR.I[3]=self.cleft_trn_tcr_a.DI(self.TCR.shared.V) #cleft updates r
self.cleft_in_tcr.shared.preV=self.IN.shared.V #cleft takes presynaptic V
self.cleft_in_tcr.r_next() #cleft updates r
self.TCR.I[4]=self.cleft_in_tcr.DI(self.TCR.shared.V) #update PSC
self.TCR.V_next()
#UPDATE IN
self.cleft_ret_in.r_next() #cleft updates r
self.IN.I[1]=self.cleft_ret_in.DI(self.IN.shared.V) #update PSC
self.cleft_in_in.shared.preV=self.IN.shared.V #cleft takes presynaptic V
self.cleft_in_in.r_next() #cleft updates r
self.IN.I[2]=self.cleft_in_in.DI(self.IN.shared.V) #update PSC
self.IN.V_next()
#----------------------------------------
def stepN(self, p, N, data_Vtcr, data_Vtrn, data_Vin): #makes N steps, receives a vector of N retinal impulses and output lists
data_Vtcr.append(self.tcr_V0)
data_Vtrn.append(self.trn_V0)
data_Vin.append(self.in_V0)
for i in xrange(N):
self.step(p[i])
data_Vtcr.append(self.TCR.shared.V) #write to output list
data_Vtrn.append(self.TRN.shared.V)
data_Vin.append(self.IN.shared.V)
name=current_process().name
print name+" "+str(i)
######################################################################################################################
############################### CODE THAT RUNS THE SIMULATION OF THE MODEL ###########################################
######################################################################################################################
def run(exec_t):
"""
runs the simulation for t=exec_t seconds
"""
t_0=time()
mu=-45.0 #mV
sigma=stdd #20.0 #mV
ret=retina(mu,sigma) #create instance of white noise generator
#initial conditions
tcr_V0=-61.0 #mV i.c excitatory potential
trn_V0=-84.0 #mV i.c inhibitory potential
in_V0=-70.0 #mV i.c excitatory potential
###########################LISTS FOR STORING DATA POINTS################################
t=linspace(0.0,exec_t,exec_t*sec_steps+1)
# data_Vtcr=[]
# data_Vtcr.append(tcr_V0)
#
# data_Vtrn=[]
# data_Vtrn.append(trn_V0)
#
# data_Vin=[]
# data_Vin.append(in_V0)
# ###NUMBER OF INSTANCES
# N=2
# pulse=ret.create_pulse()
# #CREATE INSTANCES
# groupN=[]
# for i in xrange(N):
# g=group(in_V0,tcr_V0,trn_V0)
# groupN.append(g)
#
# for i in t[1:]:
# p=pulse()
# proc=[]
# for j in xrange(N):
# pr=Process(name="group_"+str(j),target=groupN[j].step, args=(p,))
# pr.start()
# proc.append(pr)
# for j in xrange(N):
# proc[j].join(N)
#
# data_Vtcr.append((groupN[0].TCR.shared.V+groupN[1].TCR.shared.V)*0.5) #write to output list
# data_Vtrn.append((groupN[0].TRN.shared.V+groupN[1].TRN.shared.V)*0.5)
# data_Vin.append((groupN[0].IN.shared.V+groupN[1].IN.shared.V)*0.5)
#############FOR LOOPING INSIDE INSTANCE ---FASTER#############################################
#CREATE p vector of retinal pulses
p=[]
pulse=ret.create_pulse()
for k in xrange(len(t)-1):
p.append(pulse())
#CREATE INSTANCES
N=2
groupN=[]
proc=[]
manager=mp.Manager() #creating a shared namespace
data_Vtcr_0 = manager.list()
data_Vtrn_0 = manager.list()
data_Vin_0 = manager.list()
data_Vtcr_1 = manager.list()
data_Vtrn_1 = manager.list()
data_Vin_1 = manager.list()
data_Vtcr=[data_Vtcr_0, data_Vtcr_1]
data_Vtrn=[data_Vtrn_0, data_Vtrn_1]
data_Vin=[data_Vin_0, data_Vin_1]
for j in xrange(N):
g=group(tcr_V0,trn_V0,in_V0)
groupN.append(g)
for j in xrange(N):
pr=Process(name="group_"+str(j),target=groupN[j].stepN, args=(p, len(t)-1, data_Vtcr[j], data_Vtrn[j], data_Vin[j],))
pr.start()
proc.append(pr)
for j in xrange(N):
proc[j].join()
data_Vtcr_av=[0.5*i for i in map(add, data_Vtcr[0], data_Vtcr[1])]
data_Vtrn_av=[0.5*i for i in map(add, data_Vtrn[0], data_Vtrn[1])]
data_Vin_av =[0.5*i for i in map(add, data_Vin[0], data_Vin[1])]
print len(t), len(data_Vtcr[0]), len(data_Vtcr_av)
##Plotting#####################################
subplot(3,1,1)
xlabel('t')
ylabel('tcr - mV')
plot(t[50*sec_steps:],array(data_Vtcr_av)[50*sec_steps:], color='black')
subplot(3,1,2)
xlabel('t')
ylabel('trn - mV')
plot(t[50*sec_steps:],array(data_Vtrn_av)[50*sec_steps:], color='magenta')
subplot(3,1,3)
xlabel('t')
ylabel('IN - mV')
plot(t[50*sec_steps:],array(data_Vin_av)[50*sec_steps:], color='red')
#savefig('./v_tcr.eps', format='eps', dpi=1000)
###############################################
t_1=time() #measure elapsed time
print "elapsed time: ", t_1-t_0, " seconds."
#save data to file
FILE=open("./output.dat","w")
FILE.write("########################\n")
FILE.write("# t V #\n")
FILE.write("########################\n")
for k in range(len(t)):
FILE.write(str(t[k]).zfill(5)+"\t"*3+repr(data_Vtcr_av[k])+"\n")
FILE.close()
#################
show()
return t,array(data_Vtcr)
######################################################################################################################
######################################################################################################################
if __name__ == "__main__":
run(60) #run simulation for 60 seconds
【问题讨论】:
-
method实际上在做什么?一般需要多长时间?在不了解您的工作量的情况下很难给您一个好的答案。 (另外,我是回答你另一个问题的人 - MattDMo 编辑它。:) -
另外,您是否尝试过在代码中添加打印语句来识别 where 事情实际上变慢了?例如在您启动每个
Process和method实际开始运行之间是否有很长的延迟?还是创建共享列表需要很长时间?等 -
另一个问题:您的
test实例中是否有任何大型数据结构?您使用的是什么平台? -
您好,在一个实例需要 3 分钟之前,它需要大约 1 小时。是的,我使用了打印语句。发生的情况是每个实例内部都有一个很长的 for 循环,因此实际上代码在任何特定点都不会变慢。我想它只是迭代比以前慢。我将 Python 与 ipython/lubuntu 一起使用。实例内部没有大数据结构,只有小列表和变量。在由实例更新的类之外有两个非常大的列表。但我也考虑过这一点,并将它们作为测试断开连接(没有帮助)。
-
@JanusGowda 链接已失效。
Oops! We can't find that snippet. The snippet you're looking for has either been deleted by its owner or it never existed to begin with.
标签: python linux parallel-processing multiprocessing