【问题标题】:AvroInputFormat returns set of object addresses instead of valuesAvroInputFormat 返回一组对象地址而不是值
【发布时间】:2020-02-23 13:43:45
【问题描述】:

我正在使用 flink AvroOutputFormat 写入一些数据,

val source: DataSet[Row] = environment.createInput(inputBuilder.finish)
val tableEnv: BatchTableEnvironment = new BatchTableEnvironment(environment, TableConfig.DEFAULT)
val table: Table = source.toTable(tableEnv)
val avroOutputFormat = new AvroOutputFormat[Row](classOf[Row])
avroOutputFormat.setCodec(AvroOutputFormat.Codec.NULL)
source.write(avroOutputFormat, "/Users/x/Documents/test_1.avro").setParallelism(1)
environment.execute()

这会将数据写入名为test_1.avro 的文件中。当我尝试读取文件时,

val users = new AvroInputFormat[Row](new Path("/Users/x/Documents/test_1.avro"), classOf[Row])
val usersDS = environment.createInput(users)
usersDS.print()

这会将行打印为,

java.lang.Object@4462efe1,java.lang.Object@7c3e4b1a,java.lang.Object@2db4ad1,java.lang.Object@765d55d5,java.lang.Object@2513a118,java.lang.Object@2bfb583b,java.lang.Object@73ae0257,java.lang.Object@6fc1020a,java.lang.Object@5762658b

有没有办法打印这个数据值而不是对象地址。

【问题讨论】:

  • userDS 类型是什么?好像你回退到Objects toString 方法
  • 我是DataSet[Row]

标签: scala apache-flink avro flink-batch


【解决方案1】:

您正在以一种奇怪的方式混合 Table API 和 Datastream API。最好坚持使用一个 API 或使用 proper conversion methods

你基本上不会让 Flink 知道预期的输入/输出模式。 classOf[Row] 是一切,又什么都不是。

要将表格写入 Avro 文件,请使用table connector。基本草图

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Avro().avroSchema("...")) // <- Adjust
    .withSchema(schema)
    .createTemporaryTable("AvroSinkTable")
table.insertInto("AvroSinkTable")

编辑:到目前为止,不幸的是,文件系统连接器不支持 Avro。

所以别无选择,只能使用数据集 API。我建议使用 avrohugger 为您的 avro 架构生成适当的 scala 类。

// convert to your scala class
val dsTuple: DataSet[User] = tableEnv.toDataSet[User](table)
// write out
val avroOutputFormat = new AvroOutputFormat<>(User.class)
avroOutputFormat.setCodec(Codec.SNAPPY)
avroOutputFormat.setSchema(User.SCHEMA$)
specificUser.write(avroOutputFormat, outputPath1)

【讨论】:

  • 我已经完成了代码。但现在我得到了Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in the classpath. Reason: No context matches.@Arvid
猜你喜欢
  • 1970-01-01
  • 2022-10-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-08-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多