【问题标题】:How do I get an output schema for an Apache Beam SQL query?如何获取 Apache Beam SQL 查询的输出模式?
【发布时间】:2018-09-07 15:17:55
【问题描述】:

我一直在使用 Beam SQL DSL,如果不提供手动了解输出模式的编码器,我将无法使用查询的输出。我可以推断输出模式而不是硬编码吗?

walkthroughexamples 均未实际使用查询的输出。我使用 Scio 而不是普通的 Java API 来保持代码相对可读和简洁,我认为这对这个问题没有影响。

这是我的意思的一个例子。

给定一个输入架构 inSchema 和一些映射到 Row 的数据源,如下所示:(在此示例中,基于 Avro,但我认为这并不重要):

sc.avroFile[Foo](args("input"))
   .map(fooToRow)
   .setCoder(inSchema.getRowCoder)
   .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
   .saveAsTextFile(args("output"))

运行此管道会产生KryoException,如下所示:

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
fieldIndices (org.apache.beam.sdk.schemas.Schema)
schema (org.apache.beam.sdk.values.RowWithStorage)
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException

但是,插入与 SQL 输出匹配的 RowCoder,在本例中为单个 count int 列:

   ...snip...
   .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
   .setCoder(Schema.builder().addInt64Field("count").build().getRowCoder)
   .saveAsTextFile(args("output"))

现在管道运行良好。

鉴于我们指定了输入模式/编码器和查询,因此必须手动告诉管道如何对 SQL 输出进行编码似乎没有必要。在我看来,我们应该能够从中推断出输出模式 - 但除了直接使用 Calcite 之外,我看不出如何?

在 Beam Jira 上提出罚单之前,我想我会检查一下我没有遗漏一些明显的东西!

【问题讨论】:

    标签: sql apache-beam apache-calcite


    【解决方案1】:

    输出模式推断should work,你的期望是正确的。这似乎是一个错误(在 Beam 或 Scio 中),已提交 BEAM-5335 以供调查。

    【讨论】:

    • 我认为这是 Scio 中的一个错误 - 管道在纯 Java 中工作,事实证明。然后我看到Scio在应用转换后设置了编码器本身,也许这就是问题所在。我会在 repo github.com/spotify/scio/blob/… 上提出一张票
    • 感谢您的关注!我会用你的发现更新 Jira。我不熟悉 Scio 在扩展 PTrasnforms 时如何设置编码器,但它看起来确实是 bug 所在的位置。
    • 总体而言,Beam SQL 和 Schemas 正在积极开发中,时不时会出现错误和重大更改。另外,据我所知,Beam SQL 和 Schemas 目前都没有针对 Scio 进行显式测试,因此那里可能存在额外的问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-24
    • 2018-11-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-09
    • 1970-01-01
    相关资源
    最近更新 更多