【问题标题】:How to convert PCollection<TableRow> to PCollection<KV<String, String>> in JAVA如何在 JAVA 中将 PCollection<TableRow> 转换为 PCollection<KV<String, String>>
【发布时间】:2021-12-14 18:18:48
【问题描述】:

我正在尝试将包含多个值的表格行转换为 KV。我可以在 DoFn 中实现这一点,但这会增加我想要进一步编写的代码的复杂性,并使我的工作更加困难。 (基本上我需要对tablerow的两个pcollections执行CoGroupBy操作)

有什么方法可以将 PCollection 转换为 PCollection>,其中的键和值以与 tablerow 中相同的格式存储?

我写了一个看起来像这样的 sn-p,但这并没有给我想要的结果,有什么方法可以加载 tablerow 中的所有条目并生成具有这些值的 KV?

ImmutableList<TableRow> input = ImmutableList.of(new TableRow().set("val1", "testVal1").set("val2", "testVal2").set("val3", "testVal3");
PCollection<TableRow> inputPC = p.apply(Create.of(input));

inputPC.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                .via(tableRow -> KV.of((String) tableRow.get("val1"), (String) tableRow.get("val2"))));

【问题讨论】:

  • Beam 并没有任何内置的方法来转换 TableRow,因为这是一种基本上只在读取和写入 BigQuery 时使用的数据类型。出于这个原因,几乎所有涉及 TableRows 的问题的解决方案都需要编写一个 DoFn。您能否说明一下您当前的 DoFn 解决方案的功能以及您对它不满意的原因?
  • 我想对从数据流中的两个 bq 表中提取的数据执行连接操作,我无法通过查询直接实现这一点,因为我将动态获取表名。我是 beam 的初学者,所以我正在探索将 tablerow 转换为 KV 的方法,但文档没有任何有用的东西,现在我已经在我的 DoFn 中硬编码了一些值来实现这一点,但这并不理想。

标签: google-cloud-platform google-cloud-dataflow apache-beam dataflow beam-sql


【解决方案1】:

看起来您想要的是一种对从 BigQuery 获得的数据执行联接的方法。无法直接在 TableRows 上执行联接,这是因为 TableRows 通常不打算作为管道中的元素进行操作,它们的目的是专门用于使用 BigQuery IO 进行读写。

为了能够使用现有的 Beam 转换,您需要将这些 TableRows 转换为更有用的表示形式,例如您自己编写的 Java 对象或 Beam 模式 Row 类型。由于TableRow 本质上是一个 JSON 字符串字典,因此您需要做的就是编写一个 Map 函数来读取适当的类型并在必要时对其进行解析。例如:

PCollection<TableRow> tableRows = ... // Reading from BigQuery IO.
PCollection<Foo> foos = tableRows.apply(MapElements.via(
    new SimpleFunction<TableRow, Foo>() {
        @Override
        public Foo apply(TableRow row) {
            String bar = (String) row.get("bar");
            Integer baz = (Integer.parseInt((String) row.get("baz")));
            return new Foo(bar, baz);
        }
    });

一旦您拥有所选类型的数据,您就可以找到一种方法来使用内置的 Beam 转换执行连接。有很多潜在的方法可以做到这一点,所以我不会列出所有方法,但一个明确的首选是Join class

【讨论】:

  • 谢谢!我是 java 新手,你能推荐一些我可以用来为我的用例创建 Java 对象的参考吗
  • 对不起,我不知道。我实际上并没有太多使用 Java,我的知识仅限于 Apache Beam。但祝你好运。
【解决方案2】:

要将 PCollection TableRow 转换为 PCollection 字符串,您可以使用以下代码:

static class StringConverter extends DoFn<String, TableRow> {
@Override
public void processElement(ProcessContext c) { 
c.output(new TableRow().set("string_field", c.element())); }
}

Here 您可以阅读更多关于如何从 TableRow 转换为字符串的信息。

【讨论】:

  • 您好 Eduardo,感谢您的评论,但我正在寻找将 tablerow 转换为 KV,是否有可能实现这一点?
猜你喜欢
  • 2023-02-03
  • 1970-01-01
  • 2022-12-31
  • 1970-01-01
  • 2023-02-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多