【问题标题】:Distributed TensorFlow: many networks, one machine分布式 TensorFlow:多网络,一台机器
【发布时间】:2019-10-15 16:17:08
【问题描述】:

我可以成功地以分布式方式训练神经网络。我使用的策略是在每个工人身上复制图表。

对于我当前的应用程序,我需要定义 100 多个神经网络,每个神经网络都由所有工作人员异步训练。我没有选择将这 100 个网络分发给工人。使用之前的图复制方法并不是最优的,因为它需要每个 worker 超过 10Gb 的 RAM。

现在我要做的是为每个工作人员定义一个“本地”变量,该变量包含“全局”/共享变量的副本。

我想出了这个实现:(func 在每个作业的新进程中被调用)

def func(cluster, job_name, task_index):
    server = tf.train.Server(cluster, job_name, task_index)
    name = "/job:{}/task:{}".format(job_name, task_index)
    device = tf.train.replica_device_setter(worker_device=name, cluster=cluster)
    #########
    with tf.device(device), tf.variable_scope("global"):
        # Here I define the "global" variables. In that code snippet, I have one variable per network
        variables = [tf.Variable(tf.zeros(shape=(BIG, ), dtype=tf.int32), name="variable{:05d}".format(i)) for i in range(N)]
    #########
    worker_0_device = tf.train.replica_device_setter(worker_device="/job:worker/task:0", cluster=cluster)
    with tf.device(worker_0_device), tf.variable_scope("local"):
        # Here I define one "local" variable per worker. They are placed on the parameter server.
        local_variable = tf.Variable(tf.zeros(shape=(BIG, ), dtype=tf.int32), name="worker{:02d}".format(task_index))
        # Here I define operations for downloading / uploading between "global" and "local" variables
        # Those operation are placed on the worker 0 (chief)
        local_variable_download = [local_variable.assign(gvar) for gvar in variables]
        local_variable_upload = [gvar.assign(local_variable) for gvar in variables]
    #########
    with tf.device(device), tf.variable_scope("local"):
        # Here I define the gradient computation / update of the local variable
        # Those operations are placed on the current worker (according to the parameters passed to this function)
        one = tf.gradients(local_variable, local_variable)[0]
        train = local_variable.assign_add(one)
    #########
    sess = tf.Session(target=server.target)
    sess.run(tf.global_variables_initializer())
    #########
    if job_name == "ps":
        server.join()
    elif job_name == "worker":
        for a in range(10):
            for b in range(N):
                # Copy from ps0 to ps0 on device /job:worker/task:0
                download = sess.run(local_variable_download[b])[0]
                # Fake training on device /job:worker/task:n
                train_ = sess.run(train)[0]
                # Copy from ps0 to ps0 on device /job:worker/task:0
                upload = sess.run(local_variable_upload[b])[0]
                print("pass", a, "download:", download, "train:", train_, "upload:", upload)
    #########

该代码有两个问题:

  • 不考虑某些权重更新(训练)。有时两个工作人员同时下载相同的全局变量,第二次上传会覆盖第一个。我可以想办法解决这个问题,但我现在更关心的是第二个问题。

  • 我仍然有内存问题。好像每个线程都分配了全局变量。

有些东西我不明白..

非常感谢您的帮助!谢谢。

【问题讨论】:

    标签: python tensorflow architecture


    【解决方案1】:

    你可以尝试异步运行进程,celery/ redis 架构你可以遵循这个,下面是一些有用的文档和安装链接

    https://docs.celeryproject.org/en/latest/index.html
    https://stackabuse.com/asynchronous-tasks-in-django-with-redis-and-celery/
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-09-22
      • 1970-01-01
      • 2010-10-11
      • 2012-12-19
      • 1970-01-01
      • 1970-01-01
      • 2012-01-19
      相关资源
      最近更新 更多