【发布时间】:2020-03-20 07:25:13
【问题描述】:
我正在尝试迭代数据源:
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val job = Job.getInstance
FileInputFormat.addInputPath(
job,
new Path("file.parquet.gz")
)
val hadoopInputFormat: HadoopInputFormat[Void, GenericRecord] =
new HadoopInputFormat(
new AvroParquetInputFormat[GenericRecord],
classOf[Void],
classOf[GenericRecord],
job
)
val data: DataSource[tuple.Tuple2[Void, GenericRecord]] = env.createInput(hadoopInputFormat)
当我执行 data.print 时,我可以看到元组中的数据。
但是当我这样做时:
data.map
{
res =>
println("!!!!!!!!!!!111")
println( res.f1)
}
什么都没有打印出来。
我想迭代数据源并获取 GenericRecord。请帮帮我。
【问题讨论】:
标签: hadoop datasource apache-flink flink-streaming