【发布时间】:2020-05-30 05:38:40
【问题描述】:
当我训练我的数据集时(tf.data.dataset.from_generator(x),其中 x 是由
df.rdd.flatMap(func).toLocalIterator()
func 应用于我的数据集的位置。 repeat() 函数不适用于数据集。我正在使用 tensorflow 版本 1.14.0。错误是:
您的数据集迭代器用完了数据;中断训练。制作 确保您的迭代器至少可以生成
validation_steps * epochs批次(在本例中为 3 个批次)。您可能需要使用 构建数据集时的 repeat() 函数。
使重复功能起作用的可能解决方法是什么?我可以提供更多细节,但我认为正在发生的只是该功能本身在我的条件下不起作用。
编辑:
在我的脚本“pipeline.py”中有:
import numpy as np
from src.spark_utils.spark_session.sparkSession import arranca_spark
import tensorflow as tf
def f_input(
row: np.array,
future_steps=10,
input_steps=10,
slice=1,
start=0,
end=0
):
input_array = np.array(
[
np.array(row['customer_id_list']),
np.array(row['year_list']),
np.array(row['month_list']),
np.array(row['day_list']),
np.array(row['importe_list']),
np.array(row['cif_id_list']),
np.array(row['cat_id_list'])
]
)
_, n = input_array.shape
if start < 0:
for i in range(n - input_steps + 1 - future_steps + start,
n - input_steps + 1 - future_steps - end):
input_indices = range(i * slice, i * slice + input_steps, 1)
input = input_array[:, input_indices]
yield input[0], input[1], input[2], input[3], input[4], input[5], input[6]
else:
for i in range(start, n - input_steps + 1 - future_steps - end):
input_indices = range(i * slice, i * slice + input_steps, 1)
input = input_array[:, input_indices]
yield input[0], input[1], input[2], input[3], input[4], input[5], input[6]
class Input():
def __init__(self, data, input_steps, future_steps, start, end):
self.data =data
self.input_steps =input_steps
self.future_steps =future_steps
self.start =start
self.end =end
self.slice=1
def __call__(self, *args, **kwargs):
func =lambda x: f_input(x)
result = self.data.rdd.flatMap(func)
return result.toLocalIterator()
def f_output(
row: np.array,
future_steps=10,
input_steps=10,
slice=1,
start=0,
end=0
):
output_array = np.array(
[
np.array(row['year_list']),
np.array(row['month_list']),
np.array(row['day_list']),
np.array(row['cif_id_list'])
]
)
_, n = output_array.shape
if start < 0:
for i in range(n - input_steps + 1 - future_steps + start,
n - input_steps + 1 - future_steps - end):
output_indices = range(i * slice + future_steps, i * slice + input_steps + future_steps, 1)
output = np.reshape(output_array[:, output_indices], (4, 10, 1))
yield output[0], output[1], output[2], output[3]
else:
for i in range(start, n - input_steps + 1 - future_steps - end):
output_indices = range(i * slice + future_steps, i * slice + input_steps + future_steps, 1)
output = np.reshape(output_array[:, output_indices], (4, 10, 1))
yield output[0], output[1], output[2], output[3]
class Output():
def __init__(self, data, input_steps, future_steps, start, end):
self.data = data
self.input_steps = input_steps
self.future_steps = future_steps
self.start = start
self.end = end
def __call__(self, *args, **kwargs):
func = lambda x: f_output(x)
result = self.data.rdd.flatMap(func)
return result.toLocalIterator()
class Pipeline(object):
def __init__(self, df, batch_size, input_steps,future_steps):
self.df =df
self.batch_size=batch_size
self.input_steps=input_steps
self.future_steps=future_steps
@property
def train_dataset(self):
features= tf.data.Dataset.from_generator(
generator=lambda: Input(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps, start=0, end=self.input_steps+1)(),
output_types=(tf.float32, tf.float32, tf.int32, tf.int32, tf.float32, tf.int32, tf.int32),
output_shapes=(
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,))
)
)
labels = tf.data.Dataset.from_generator(
generator=lambda: Output(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps,
start=0, end=self.input_steps+1)(),
output_types=(tf.float32, tf.int32, tf.int32, tf.int32),
output_shapes=(
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1))
)
)
return tf.data.Dataset.zip((features, labels)).repeat().batch(self.batch_size)
@property
def test_dataset(self):
features= tf.data.Dataset.from_generator(
generator=lambda: Input(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps, start=-1, end=0)(),
output_types=(tf.float32, tf.float32, tf.int32, tf.int32, tf.float32, tf.int32, tf.int32),
output_shapes=(
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,))
)
)
labels = tf.data.Dataset.from_generator(
generator=lambda: Output(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps,
start=-1, end=0)(),
output_types=(tf.float32, tf.int32, tf.int32, tf.int32),
output_shapes=(
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1))
)
)
return tf.data.Dataset.zip((features, labels)).repeat().batch(self.batch_size)
从另一个文件中,我通过写来调用这些类:
from pyspark.sql import Row
names=("customer_id_list", "year_list", "month_list", "day_list", "importe_list", 'cif_id_list', 'cat_id_list')
customer=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
year=[0.5, 0.5, 0.5, 0.5, 0.5,0.5, 0.5, 0.5, 0.5, 0.5,0.5 ,0.5, 0.5, 0.5, 0.5, 0.5, 0.5,0.5, 0.5, 0.5, 0.5, 0.5,0.5 ,0.5 ]
month=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
day=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
importe=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
cif_id=[1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3]
cat_id=[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]
row_dict={'customer_id_list': customer, 'year_list':year, 'month_list':month, 'day_list':day, 'importe_list':importe, 'cif_id_list':cif_id, 'cat_id_list':cat_id}
df = spark.createDataFrame([Row(**row_dict)])
input=Input(df,input_steps=10, future_steps=10, start=0, end=0)
print(list(input()))
然后我得到错误:
线程“serve toLocalIterator”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:阶段 0.0 中的任务 0 失败 4 次,最近一次......
知道为什么会这样吗?
【问题讨论】:
-
尝试不使用
repeat打印数据集的元素,然后使用repeat(2)重试。你得到相同数量的元素吗?
标签: tensorflow keras generator tensorflow-datasets training-data