【问题标题】:tf.data + generator + keras => repeat() does not work, why?tf.data + generator + keras => repeat() 不起作用,为什么?
【发布时间】:2020-05-30 05:38:40
【问题描述】:

当我训练我的数据集时(tf.data.dataset.from_generator(x),其中 x 是由

创建的迭代器
df.rdd.flatMap(func).toLocalIterator() 

func 应用于我的数据集的位置。 repeat() 函数不适用于数据集。我正在使用 tensorflow 版本 1.14.0。错误是:

您的数据集迭代器用完了数据;中断训练。制作 确保您的迭代器至少可以生成 validation_steps * epochs 批次(在本例中为 3 个批次)。您可能需要使用 构建数据集时的 repeat() 函数。

使重复功能起作用的可能解决方法是什么?我可以提供更多细节,但我认为正在发生的只是该功能本身在我的条件下不起作用。

编辑

在我的脚本“pipeline.py”中有:

import numpy as np
from src.spark_utils.spark_session.sparkSession import arranca_spark
import tensorflow as tf

def f_input(
        row: np.array,
        future_steps=10,
        input_steps=10,
        slice=1,
        start=0,
        end=0
):
    input_array = np.array(
        [
            np.array(row['customer_id_list']),
            np.array(row['year_list']),
            np.array(row['month_list']),
            np.array(row['day_list']),
            np.array(row['importe_list']),
            np.array(row['cif_id_list']),
            np.array(row['cat_id_list'])
        ]
    )
    _, n = input_array.shape
    if start < 0:
        for i in range(n - input_steps + 1 - future_steps + start,
                       n - input_steps + 1 - future_steps - end):
            input_indices = range(i * slice, i * slice + input_steps, 1)
            input = input_array[:, input_indices]
            yield input[0], input[1], input[2], input[3], input[4], input[5], input[6]
    else:
        for i in range(start, n - input_steps + 1 - future_steps - end):
            input_indices = range(i * slice, i * slice + input_steps, 1)
            input = input_array[:, input_indices]
            yield input[0], input[1], input[2], input[3], input[4], input[5], input[6]




class Input():
    def __init__(self, data, input_steps, future_steps, start, end):
        self.data =data
        self.input_steps =input_steps
        self.future_steps =future_steps
        self.start =start
        self.end =end
        self.slice=1

    def __call__(self, *args, **kwargs):
        func =lambda x: f_input(x)
        result = self.data.rdd.flatMap(func)
        return result.toLocalIterator()

def f_output(
        row: np.array,
        future_steps=10,
        input_steps=10,
        slice=1,
        start=0,
        end=0
):
    output_array = np.array(
        [
            np.array(row['year_list']),
            np.array(row['month_list']),
            np.array(row['day_list']),
            np.array(row['cif_id_list'])
        ]
    )
    _, n = output_array.shape
    if start < 0:
        for i in range(n - input_steps + 1 - future_steps + start,
                       n - input_steps + 1 - future_steps - end):
            output_indices = range(i * slice + future_steps, i * slice + input_steps + future_steps, 1)
            output = np.reshape(output_array[:, output_indices], (4, 10, 1))
            yield output[0], output[1], output[2], output[3]
    else:
        for i in range(start, n - input_steps + 1 - future_steps - end):
            output_indices = range(i * slice + future_steps, i * slice + input_steps + future_steps, 1)
            output = np.reshape(output_array[:, output_indices], (4, 10, 1))
            yield output[0], output[1], output[2], output[3]

class Output():
    def __init__(self, data, input_steps, future_steps, start, end):
        self.data = data
        self.input_steps = input_steps
        self.future_steps = future_steps
        self.start = start
        self.end = end

    def __call__(self, *args, **kwargs):
        func = lambda x: f_output(x)
        result = self.data.rdd.flatMap(func)
        return result.toLocalIterator()

class Pipeline(object):
    def __init__(self, df, batch_size, input_steps,future_steps):
        self.df =df
        self.batch_size=batch_size
        self.input_steps=input_steps
        self.future_steps=future_steps

    @property
    def train_dataset(self):
        features= tf.data.Dataset.from_generator(
            generator=lambda: Input(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps, start=0, end=self.input_steps+1)(),
            output_types=(tf.float32, tf.float32, tf.int32, tf.int32, tf.float32, tf.int32, tf.int32),
            output_shapes=(
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,))
            )
        )
        labels = tf.data.Dataset.from_generator(
            generator=lambda: Output(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps,
                                         start=0, end=self.input_steps+1)(),
            output_types=(tf.float32, tf.int32, tf.int32, tf.int32),
            output_shapes=(
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1))
            )
        )
        return tf.data.Dataset.zip((features, labels)).repeat().batch(self.batch_size)

    @property
    def test_dataset(self):
        features= tf.data.Dataset.from_generator(
            generator=lambda: Input(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps, start=-1, end=0)(),
            output_types=(tf.float32, tf.float32, tf.int32, tf.int32, tf.float32, tf.int32, tf.int32),
            output_shapes=(
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,))
            )
        )
        labels = tf.data.Dataset.from_generator(
            generator=lambda: Output(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps,
                                         start=-1, end=0)(),
            output_types=(tf.float32, tf.int32, tf.int32, tf.int32),
            output_shapes=(
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1))
            )
        )
        return tf.data.Dataset.zip((features, labels)).repeat().batch(self.batch_size)

从另一个文件中,我通过写来调用这些类:

from pyspark.sql import Row
names=("customer_id_list", "year_list", "month_list", "day_list", "importe_list", 'cif_id_list', 'cat_id_list')
customer=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
year=[0.5, 0.5, 0.5, 0.5, 0.5,0.5, 0.5, 0.5, 0.5, 0.5,0.5 ,0.5, 0.5, 0.5, 0.5, 0.5, 0.5,0.5, 0.5, 0.5, 0.5, 0.5,0.5 ,0.5 ]
month=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
day=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
importe=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
cif_id=[1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3]
cat_id=[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]

row_dict={'customer_id_list': customer, 'year_list':year, 'month_list':month, 'day_list':day, 'importe_list':importe, 'cif_id_list':cif_id, 'cat_id_list':cat_id}
df = spark.createDataFrame([Row(**row_dict)])
input=Input(df,input_steps=10, future_steps=10, start=0, end=0)
print(list(input()))

然后我得到错误:

线程“serve toLocalIterator”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:阶段 0.0 中的任务 0 失败 4 次,最近一次......

知道为什么会这样吗?

【问题讨论】:

  • 尝试不使用repeat 打印数据集的元素,然后使用repeat(2) 重试。你得到相同数量的元素吗?

标签: tensorflow keras generator tensorflow-datasets training-data


【解决方案1】:

确保您的生成器函数可以被多次调用,或者使用.cache() 以便生成器只需要运行一次。

import tensorflow as tf

# Created only once outside of gen()
it = iter([1, 2, 3])

def gen():
  for element in it:
    yield element

dataset = tf.data.Dataset.from_generator(gen, tf.int32)
print(list(dataset.repeat(2).as_numpy_iterator()))  # Prints just [1, 2, 3]

如果您将迭代器创建移动到生成器函数中,repeat 将按照您的预期工作:

import tensorflow as tf

def gen():
  # Create iterator every time gen() is called
  it = iter([1, 2, 3])
  for element in it:
    yield element

dataset = tf.data.Dataset.from_generator(gen, tf.int32)
print(list(dataset.repeat(2).as_numpy_iterator()))  # Prints [1, 2, 3, 1, 2, 3]

另一种选择是使用缓存,这样gen 只会被调用一次:

import tensorflow as tf

# Created only once outside of gen()
it = iter([1, 2, 3])

def gen():
  for element in it:
    yield element

dataset = tf.data.Dataset.from_generator(gen, tf.int32)
# Add a `.cache()` here
print(list(dataset.cache().repeat(2).as_numpy_iterator()))  # Prints [1, 2, 3, 1, 2, 3]

【讨论】:

  • 非常感谢您的回答。我进行了编辑,并通过一些示例为您提供了完整的代码。我不知道为什么这现在不起作用。如果我将示例(编辑中的第二个代码)放在同一个脚本中,它确实可以工作......
猜你喜欢
  • 2013-08-14
  • 1970-01-01
  • 1970-01-01
  • 2019-07-20
  • 1970-01-01
  • 2019-05-03
  • 2020-08-03
  • 2016-07-13
  • 2017-06-20
相关资源
最近更新 更多