【问题标题】:Distributed training of tf.learn Estimators?tf.learn 估计器的分布式训练?
【发布时间】:2017-03-23 00:43:12
【问题描述】:

我想使用 Tensorflow 高级 API 以分布式方式在 MNIST 上训练卷积神经网络。 我试图指定一个集群配置,并将其传递给一个 Estimator(下面的代码)。

我收到以下错误 MergeFrom() 的参数必须是同一类的实例:预期的 tensorflow.ConfigProto 得到了属性

有人知道我指定配置的方式有什么问题吗?

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function


import grpc
import numpy as np
import tensorflow as tf
from tensorflow.contrib import learn
from tensorflow.contrib.learn.python.learn.estimators import  model_fn as model_fn_lib
from tensorflow.contrib.learn.python.learn.estimators import run_config as run_config_lib
from tensorflow.python import debug as tf_debug
tf.logging.set_verbosity(tf.logging.ERROR)
import json
import os
import shutil

### Data - Mnist

mnist=learn.datasets.load_dataset('mnist')
train_data=mnist.train.images
train_labels=np.asarray(mnist.train.labels, dtype=np.int32)
eval_data=mnist.test.images
eval_labels=np.asarray(mnist.test.labels, dtype=np.int32)

BATCH_SIZE=100
NUM_EPOCHS=10
train_input_fn = learn.io.numpy_input_fn({'x': train_data}, train_labels, shuffle=True, batch_size=BATCH_SIZE, 
                                         num_epochs=NUM_EPOCHS)
batch_size = 100
num_epochs = 1
eval_input_fn = learn.io.numpy_input_fn({'x': eval_data}, eval_labels, shuffle=False, batch_size=batch_size, num_epochs=num_epochs) 

### Cluster

my_cluster = {'ps': ['/cpu:0'],
                 'worker': ['/gpu:0']}
os.environ['TF_CONFIG'] = json.dumps(
          {'cluster': my_cluster,
           'task': {'type': 'worker', 'index': 1}})

my_configs=learn.RunConfig()

server = tf.train.Server(server_or_cluster_def=my_configs.cluster_spec, job_name='worker')

### Model

def cnn_model_fn(features, labels, mode):

    input_layer=tf.reshape(features['x'],shape=[-1,28,28,1])

    #conv1
    conv1=tf.layers.conv2d(inputs=input_layer,
                           filters=32,
                           kernel_size=[5, 5],
                           padding='same',
                           activation=tf.nn.relu)
    pool1=tf.layers.max_pooling2d(inputs=conv1, pool_size=[2,2], strides=2)

    #conv2
    conv2=tf.layers.conv2d(inputs=pool1,
                           filters=64,
                           kernel_size=[5,5],
                           padding='same',
                           activation=tf.nn.relu)
    pool2=tf.layers.max_pooling2d(inputs=conv2, pool_size=[2,2], strides=2)

    #fully connected layers
    pool2_flat=tf.reshape(pool2, [-1, 7*7*64])
    dense1=tf.layers.dense(pool2_flat, 1024, activation=tf.nn.relu)
    dropout = tf.layers.dropout(inputs=dense1, rate=0.4, training=mode == learn.ModeKeys.TRAIN)

    #fc2
    logits=tf.layers.dense(dropout, 10, activation=tf.nn.relu)
    loss = None
    train_op = None

    #loss
    if mode != learn.ModeKeys.INFER:
        onehot_labels=tf.one_hot(indices=tf.cast(labels, tf.int32),depth=10)
        loss=tf.losses.softmax_cross_entropy(onehot_labels=onehot_labels, logits=logits)

    #optimizer
    if mode == learn.ModeKeys.TRAIN:
        with tf.device("/job:worker/task:1"):
            train_op = tf.contrib.layers.optimize_loss(
            loss=loss,
            global_step=tf.contrib.framework.get_global_step(),
            learning_rate=0.0001,
            optimizer="Adam")

    #predictions
    predictions={
            'classes': tf.argmax(logits, axis=1) ,
            'predictions': tf.nn.softmax(logits,name="softmax_tensor")           
        }
    return model_fn_lib.ModelFnOps(mode=mode, predictions=predictions, loss=loss, train_op=train_op)

classifier=learn.Estimator(model_fn=cnn_model_fn, model_dir="/tmp/mnist_distributed", config=my_configs)

### logging

tensors_to_log = {"probabilities": "softmax_tensor"}
logging_hook = tf.train.LoggingTensorHook(tensors=tensors_to_log, every_n_iter=50)

### Metrics

metrics = {
  "accuracy":
      learn.MetricSpec(
          metric_fn=tf.metrics.accuracy, prediction_key="classes"),
}

### Distributing training

distributed_experiment=learn.Experiment(estimator=classifier, 
                train_input_fn=train_input_fn,
                eval_input_fn=eval_input_fn,
                eval_metrics=metrics,
                #train_monitors=my_monitors,
                train_steps=200,
                )

distributed_experiment.train_and_evaluate()

【问题讨论】:

    标签: python machine-learning tensorflow computer-vision distributed-computing


    【解决方案1】:

    如果你想在 TF 中运行分布式估计器,有一个实例:

    from tensorflow.contrib.learn.python.learn import learn_runner
    from tensorflow.contrib.learn.python.learn.estimators import run_config
    
    ...
    
    learn_runner.run(
      experiment_fn=create_experiment_fn(config),
      output_dir=output_dir)
    

    这里的“experiment_fn”只是代码中的“distributed_experiment”。您的实验中还应该有一个“输出目录”

    【讨论】:

      【解决方案2】:

      my_config 应该是 RunConfig 的一个实例,而不是 RunConfig 本身。当 RunConfig 初始化时,它将从 TF_CONFIG 环境变量加载 ps、workers 和任务配置。 https://www.tensorflow.org/api_docs/python/tf/contrib/learn/RunConfig

      【讨论】:

      • 类实例化已修复。谢谢,我现在可以启动服务器了,但是当我运行 Distributed_experiment.train() 时,上面的代码仍然停止,你看到我可能还缺少什么吗?
      • @smh 你能提供任何日志吗?你没有 1 ps 和 1 工人吗?
      • 我需要启动不同的服务器作为 ps 吗?我错误地定义了集群,或者训练在 cnn_model_fn 中的分布方式,但还没有弄清楚如何修复它。这些是我在 distributed_experiment.train() 停止之前得到的日志:DEBUG:tensorflow:Setting feature info to {'x': TensorSignature(dtype=tf.float32, shape=TensorShape([Dimension(None), Dimension(784)]), is_sparse=False)}. DEBUG:tensorflow:Setting labels info to TensorSignature(dtype=tf.int32, shape=TensorShape([Dimension(None)]), is_sparse=False) INFO:tensorflow:Create CheckpointSaverHook.
      • 您应该创建两个不同的进程,一个是 ps,另一个是 worker。 ps 进程只是在工作人员进行实验时使用 server.join() 。分布式张量流参考:ischlag.github.io/2016/06/12/async-distributed-tensorflow
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-15
      • 1970-01-01
      • 1970-01-01
      • 2018-01-02
      • 2018-11-07
      • 1970-01-01
      相关资源
      最近更新 更多