【问题标题】:What's going on in tf.train.shuffle_batch and `tf.train.batch?tf.train.shuffle_batch 和 `tf.train.batch 发生了什么?
【发布时间】:2017-03-26 12:27:34
【问题描述】:

我使用Binary data 来训练 DNN。

但是tf.train.shuffle_batchtf.train.batch让我很困惑。

这是我的代码,我将对其进行一些测试。

第一Using_Queues_Lib.py

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os

from six.moves import xrange  # pylint: disable=redefined-builtin
import tensorflow as tf

NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 100
REAL32_BYTES=4


def read_dataset(filename_queue,data_length,label_length):
  class Record(object):
    pass
  result = Record()

  result_data  = data_length*REAL32_BYTES
  result_label = label_length*REAL32_BYTES
  record_bytes = result_data + result_label

  reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
  result.key, value = reader.read(filename_queue)

  record_bytes = tf.decode_raw(value, tf.float32)
  result.data  = tf.strided_slice(record_bytes, [0],[data_length])#record_bytes: tf.float list
  result.label = tf.strided_slice(record_bytes, [data_length],[data_length+label_length])
  return result


def _generate_data_and_label_batch(data, label, min_queue_examples,batch_size, shuffle):
  num_preprocess_threads = 16   #only speed code
  if shuffle:
    data_batch, label_batch = tf.train.shuffle_batch([data, label],batch_size=batch_size,num_threads=num_preprocess_threads,capacity=min_queue_examples + batch_size,min_after_dequeue=min_queue_examples)
  else:
    data_batch, label_batch = tf.train.batch([data, label],batch_size=batch_size,num_threads=num_preprocess_threads,capacity=min_queue_examples + batch_size)
  return data_batch, label_batch

def inputs(data_dir, batch_size,data_length,label_length):
  filenames = [os.path.join(data_dir, 'test_data_SE.dat')]
  for f in filenames:
    if not tf.gfile.Exists(f):
      raise ValueError('Failed to find file: ' + f)

  filename_queue = tf.train.string_input_producer(filenames)

  read_input = read_dataset(filename_queue,data_length,label_length)

  read_input.data.set_shape([data_length])   #important
  read_input.label.set_shape([label_length]) #important


  min_fraction_of_examples_in_queue = 0.4
  min_queue_examples = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN *
                       min_fraction_of_examples_in_queue)
  print ('Filling queue with %d samples before starting to train. '
     'This will take a few minutes.' % min_queue_examples)

  return _generate_data_and_label_batch(read_input.data, read_input.label,
                                     min_queue_examples, batch_size,
                                     shuffle=True)

第二个Using_Queues.py

import Using_Queues_Lib
import tensorflow as tf
import numpy as np
import time


max_steps=10
batch_size=100
data_dir=r'.'
data_length=2
label_length=1

#-----------Save paras-----------
import struct
def WriteArrayFloat(file,data):
  fout=open(file,'wb')        
  fout.write(struct.pack('<'+str(data.flatten().size)+'f',
                                *data.flatten().tolist()))
  fout.close()
#-----------------------------

def add_layer(inputs, in_size, out_size, activation_function=None):
  Weights = tf.Variable(tf.truncated_normal([in_size, out_size]))
  biases = tf.Variable(tf.zeros([1, out_size]) + 0.1)
  Wx_plus_b = tf.matmul(inputs, Weights) + biases
  if activation_function is None:
    outputs = Wx_plus_b
  else:
    outputs = activation_function(Wx_plus_b)
  return outputs

data_train,labels_train=Using_Queues_Lib.inputs(data_dir=data_dir,
                      batch_size=batch_size,data_length=data_length,
                                          label_length=label_length)

xs=tf.placeholder(tf.float32,[None,data_length])
ys=tf.placeholder(tf.float32,[None,label_length])

l1 = add_layer(xs, data_length, 5, activation_function=tf.nn.sigmoid)
l2 = add_layer(l1, 5, 5, activation_function=tf.nn.sigmoid)
prediction = add_layer(l2, 5, label_length, activation_function=None)

loss = tf.reduce_mean(tf.square(ys - prediction))
train_step = tf.train.GradientDescentOptimizer(0.2).minimize(loss)

sess=tf.InteractiveSession()
tf.global_variables_initializer().run()

tf.train.start_queue_runners()

for i in range(max_steps):
  start_time=time.time()
  data_batch,label_batch=sess.run([data_train,labels_train])
  sess.run(train_step, feed_dict={xs: data_batch, ys: label_batch})
  duration=time.time()-start_time
  if i % 1 == 0:
    example_per_sec=batch_size/duration
    sec_pec_batch=float(duration)
    WriteArrayFloat(r'./data/'+str(i)+'.bin',
        np.concatenate((data_batch,label_batch),axis=1))
    format_str=('step %d,loss=%.8f(%.1f example/sec;%.3f sec/batch)')
    loss_value=sess.run(loss, feed_dict={xs: data_batch, ys: label_batch})
    print(format_str%(i,loss_value,example_per_sec,sec_pec_batch))

here中的数据。它由Mathematica生成。

data = Flatten@Table[{x, y, x*y}, {x, -1, 1, .05}, {y, -1, 1, .05}];
BinaryWrite[file, mydata, "Real32", ByteOrdering -> -1];
Close[file];

数据长度:1681

数据如下:

绘制数据:红色绿色颜色表示它们在here

中出现的时间

运行Using_Queues.py,它将产生十个批次,我在此图中绘制每个批次:(batch_size=100min_queue_examples=40

如果batch_size=1024min_queue_examples=40

如果batch_size=100min_queue_examples=4000

如果batch_size=1024min_queue_examples=4000

即使 batch_size=1681min_queue_examples=4000

该区域没有用点填充。

为什么?

那么为什么要把min_queue_examples 改成更随机的呢? min_queue_examples的值如何确定?

tf.train.shuffle_batch 发生了什么?

【问题讨论】:

  • 题外话:你的阴谋如何?情节非常漂亮。
  • @HughPerkins Mathematica 11
  • 感谢您提出的好问题。你能再解释一下情节吗? “红色到绿色的颜色意味着它们在这里发生的时间”是什么意思?当 batch_size=100 和 min_queue_examples=40 时,每个图都是一个批次,不同颜色的点意味着什么?

标签: python tensorflow


【解决方案1】:

tf.train.shuffle_batch()(以及因此tf.RandomShuffleQueue)使用的采样函数有点微妙。实现使用tf.RandomShuffleQueue.dequeue_many(batch_size),其(简化)实现如下:

  • 虽然出队的元素数量少于batch_size
    • 等到队列至少包含min_after_dequeue + 1 元素。
    • 从队列中均匀随机选择一个元素,将其从队列中移除,并将其添加到输出批次中。

另外要注意的是元素是如何添加到队列中的,它使用了在同一个队列上运行tf.RandomShuffleQueue.enqueue()的后台线程:

  • 等待队列的当前大小小于其capacity
  • 将元素添加到队列中。

因此,队列的capacitymin_after_dequeue 属性(加上正在排队的输入数据的分布)决定了tf.train.shuffle_batch() 将从中采样的总体。您的输入文件中的数据似乎是有序的,因此您完全依赖 tf.train.shuffle_batch() 函数来实现随机性。

依次进行可视化:

  1. 如果 capacitymin_after_dequeue 相对于数据集而言较小,则“洗牌”将从小群体中选择随机元素,类似于跨数据集的“滑动窗口”。很有可能您会在出队的批次中看到旧元素。

  2. 如果batch_size 相对于数据集很大而min_after_dequeue 很小,则“洗牌”将再次从整个数据集的一个小的“滑动窗口”中进行选择。

  3. 如果min_after_dequeue 相对于batch_size 和数据集的大小而言较大,您将看到(大约)来自每批数据的统一样本。

  4. 如果 min_after_dequeuebatch_size 相对于数据集的大小而言较大,您将看到(大约)来自每批数据的统一样本。

  5. 1234563某些元素将被多次采样(您不太可能只对每个唯一元素进行一次采样)。

【讨论】:

  • 有没有关于如何选择min_after_dequeue 的好习惯。例如,我有一个文件包含3240785个示例,我设置了min_after_dequeue=20000,BATCH_SIZE=128,capacity=min_queue_examples + batch_size,是一个很好的做法选择?是否可以随机播放数据?
【解决方案2】:

shuffle_batch 只不过是异步的 RandomShuffleQueue 实现。你首先要了解什么是异步。然后shuffle_batch应该很容易理解,在官方文档(https://www.tensorflow.org/versions/r1.3/programmers_guide/threading_and_queues)的帮助下。 假设您想设计一个可以同时读取和写入数据的系统。大多数人都是这样设计的:

1) 创建一个读数据的线程,一个写数据的线程 数据。读取线程将从队列中删除一个元素以进行读取(出队),写入线程将向队列添加一个元素作为写入的结果(入队)。

2) 使用阻塞队列来管理读取之间的同步 和写线程,因为你不希望阅读线程是 读取与写入线程正在写入的数据相同的数据,并且当 队列为空,读取线程应该挂起(阻塞)等待 写入线程要写入(入队)的数据,以及何时入队 已满,写线程应该等待读线程 从队列中弹出数据(出队)。 在 tensorflow 输入管道中,情况并没有什么不同。基本上有两组线程在工作,一个是将训练示例添加到队列中,另一个负责从队列中获取训练示例进行训练。这正是 slice_input_producer、string_input_producer、shuffle_batch 的设计方式。

我给你写了一个小程序,让你了解tensorflow输入管道,shuffle_batch以及min_after_dequeue和batch_size参数的作用:

import tensorflow as tf
import numpy as np
test_size = 2000
input_data = tf.range(test_size)

xi = [x for x in range(0, test_size, 50)[1:]]
yi = [int(test_size * x) for x in np.array(range(1, 100, 5)) / 100.0]
zi = np.zeros(shape=(len(yi), len(xi)))
with tf.Session() as sess:
    for idx, batch_size in enumerate(xi):
        for idy, min_after_dequeue in enumerate(yi):
            # synchronization example 1: create a fifo queue, one thread is
            # adding many training examples  at a time to the queue, and the other
            # is taking one example at a time out of the queue.
            # this is similar to what slice_input_producer does.
            fifo_q = tf.FIFOQueue(capacity=test_size, dtypes=tf.int32,
                                  shapes=[[]])
            en_fifo_q = fifo_q.enqueue_many(input_data)
            single_data = fifo_q.dequeue()
            # synchronization example 2: create a random shuffle queue, one thread is
            # adding one training example  at a time to the queue, and the other
            # is taking many examples as a batch at a time out of the queue.
            # this is similar to what shuffle_batch does.
            rf_queue = tf.RandomShuffleQueue(capacity=test_size,
                                             min_after_dequeue=min_after_dequeue,
                                             shapes=single_data._shape, dtypes=single_data._dtype)
            rf_enqueue = rf_queue.enqueue(single_data)
            batch_data = rf_queue.dequeue_many(batch_size)

            # now let's creating threads for enqueue operations(writing thread).
            # enqueue threads have to be started at first, the tf session will
            # take care of your training(reading thread) which will be running when you call sess.run.
            # the tf coordinators are nothing but threads managers that take care of the life cycle
            # for created threads
            qr_fifo = tf.train.QueueRunner(fifo_q, [en_fifo_q] * 8)
            qr_rf = tf.train.QueueRunner(rf_queue, [rf_enqueue] * 4)
            coord = tf.train.Coordinator()
            fifo_queue_threads = qr_fifo.create_threads(sess, coord=coord, start=True)
            rf_queue_threads = qr_rf.create_threads(sess, coord=coord, start=True)
            shuffle_pool = []
            num_steps = int(np.ceil(test_size / float(batch_size)))
            for i in range(num_steps):
                shuffle_data = sess.run([batch_data])
                shuffle_pool.extend(shuffle_data[0].tolist())
            # evaluating unique_rate of each combination of batch_size and min_after_dequeue
            # unique rate 1.0 indicates each example is shuffled uniformly.
            # unique rate < 1.0 means that some examples are shuffled twice.
            unique_rate = len(np.unique(shuffle_pool)) / float(test_size)
            print min_after_dequeue, batch_size, unique_rate
            zi[idy, idx] = unique_rate
            # stop threads.
            coord.request_stop()
            coord.join(rf_queue_threads)
            coord.join(fifo_queue_threads)

print xi, yi, zi
plt.clf()
plt.title('shuffle_batch_example')
plt.ylabel('num_dequeue_ratio')
plt.xlabel('batch_size')
xxi, yyi = np.meshgrid(xi, yi)
plt.pcolormesh(xxi, yyi, zi)
plt.colorbar()
plt.show()

如果你运行上面的代码,你应该会看到图:

我们可以清楚的看到,batch_size越大,unique_rate就越高,而min_after_dequeue越小,unique rate就越高。 唯一率是我计算的一个指标,用于监控在小批量的 shuffle_batch 中生成了多少重复样本。

【讨论】:

    【解决方案3】:

    使用 decode_raw 读取原始数据。

    float_values = tf.decode_raw(data, tf.float32, little_endian=True)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-10-03
      • 2022-01-10
      • 1970-01-01
      • 1970-01-01
      • 2016-02-28
      • 2014-02-28
      • 2010-10-02
      • 2020-03-14
      相关资源
      最近更新 更多