【问题标题】:Flink DataSource IterateFlink 数据源迭代
【发布时间】:2020-03-20 07:25:13
【问题描述】:

我正在尝试迭代数据源:


     val env = ExecutionEnvironment.getExecutionEnvironment
      env.setParallelism(1)

      val job = Job.getInstance
      FileInputFormat.addInputPath(
        job,
        new Path("file.parquet.gz")
      )

      val hadoopInputFormat: HadoopInputFormat[Void, GenericRecord] =
        new HadoopInputFormat(
          new AvroParquetInputFormat[GenericRecord],
          classOf[Void],
          classOf[GenericRecord],
          job
        )
       val data: DataSource[tuple.Tuple2[Void, GenericRecord]] = env.createInput(hadoopInputFormat)

当我执行 data.print 时,我可以看到元组中的数据。

但是当我这样做时:


    data.map
     {
       res =>
         println("!!!!!!!!!!!111")
         println( res.f1)
     }

什么都没有打印出来。

我想迭代数据源并获取 GenericRecord。请帮帮我。

【问题讨论】:

    标签: hadoop datasource apache-flink flink-streaming


    【解决方案1】:

    为了在不调用printcollect的情况下执行Flink批处理程序,你需要调用env.execute()。在没有上述API调用的情况下,只有这个调用会触发程序的执行。

    【讨论】:

      【解决方案2】:

      您可以使用 data.collect 然后使用: data.iterator().next() 进行迭代

      【讨论】:

        猜你喜欢
        • 2018-07-25
        • 2016-02-26
        • 1970-01-01
        • 2011-07-19
        • 1970-01-01
        • 2023-03-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多