【发布时间】:2019-10-15 16:17:08
【问题描述】:
我可以成功地以分布式方式训练神经网络。我使用的策略是在每个工人身上复制图表。
对于我当前的应用程序,我需要定义 100 多个神经网络,每个神经网络都由所有工作人员异步训练。我没有选择将这 100 个网络分发给工人。使用之前的图复制方法并不是最优的,因为它需要每个 worker 超过 10Gb 的 RAM。
现在我要做的是为每个工作人员定义一个“本地”变量,该变量包含“全局”/共享变量的副本。
我想出了这个实现:(func 在每个作业的新进程中被调用)
def func(cluster, job_name, task_index):
server = tf.train.Server(cluster, job_name, task_index)
name = "/job:{}/task:{}".format(job_name, task_index)
device = tf.train.replica_device_setter(worker_device=name, cluster=cluster)
#########
with tf.device(device), tf.variable_scope("global"):
# Here I define the "global" variables. In that code snippet, I have one variable per network
variables = [tf.Variable(tf.zeros(shape=(BIG, ), dtype=tf.int32), name="variable{:05d}".format(i)) for i in range(N)]
#########
worker_0_device = tf.train.replica_device_setter(worker_device="/job:worker/task:0", cluster=cluster)
with tf.device(worker_0_device), tf.variable_scope("local"):
# Here I define one "local" variable per worker. They are placed on the parameter server.
local_variable = tf.Variable(tf.zeros(shape=(BIG, ), dtype=tf.int32), name="worker{:02d}".format(task_index))
# Here I define operations for downloading / uploading between "global" and "local" variables
# Those operation are placed on the worker 0 (chief)
local_variable_download = [local_variable.assign(gvar) for gvar in variables]
local_variable_upload = [gvar.assign(local_variable) for gvar in variables]
#########
with tf.device(device), tf.variable_scope("local"):
# Here I define the gradient computation / update of the local variable
# Those operations are placed on the current worker (according to the parameters passed to this function)
one = tf.gradients(local_variable, local_variable)[0]
train = local_variable.assign_add(one)
#########
sess = tf.Session(target=server.target)
sess.run(tf.global_variables_initializer())
#########
if job_name == "ps":
server.join()
elif job_name == "worker":
for a in range(10):
for b in range(N):
# Copy from ps0 to ps0 on device /job:worker/task:0
download = sess.run(local_variable_download[b])[0]
# Fake training on device /job:worker/task:n
train_ = sess.run(train)[0]
# Copy from ps0 to ps0 on device /job:worker/task:0
upload = sess.run(local_variable_upload[b])[0]
print("pass", a, "download:", download, "train:", train_, "upload:", upload)
#########
该代码有两个问题:
不考虑某些权重更新(训练)。有时两个工作人员同时下载相同的全局变量,第二次上传会覆盖第一个。我可以想办法解决这个问题,但我现在更关心的是第二个问题。
我仍然有内存问题。好像每个线程都分配了全局变量。
有些东西我不明白..
非常感谢您的帮助!谢谢。
【问题讨论】:
标签: python tensorflow architecture