【问题标题】:Getting List From PCollections从 PCollections 获取列表
【发布时间】:2018-12-02 19:29:23
【问题描述】:

我想要一个来自 Pcollection 的列表值。

PCollection<List<Integer>> lst =    bqT2.apply(ParDo.of(new 
UserId()));  // line 1
List myList = lst.getAll(); // line 2 

但是没有“getAll()”函数

我发现了类似的东西

    List<String> dummylist = Arrays.asList(dummy);
    DoFnTester<String,String> fnTester = DoFnTester.of(new AAA(mapview));
    fnTester.setSideInputInGlobalWindow(mapview, csvlist);
    //dummylines.apply(ParDo.of(fnTester));
    List<String> results = fnTester.processBatch(dummylist);

但我没有找到任何使用“DoFnTester”函数获取列表项的方法。

有没有办法从 PCollection 中列出?

为了详细说明,我有两个 PCollection。

PCollection   p1 = pipeline.apply("",
BigQueryIO.read().fromQuery("SELECT * from myTable where userid in " +  lst + ));

注意:lst 来自第 1 行

不确定谷歌数据流是否不支持简单的用例。

【问题讨论】:

    标签: java google-cloud-platform pipeline dataflow


    【解决方案1】:

    如果列表适合内存,apache_beam.transforms.combiners.ToList 可以为您工作。

    beam.combiners.ToList() 是 Python 版本。

    【讨论】:

    • beam.combiners.ToList() 创建一个带有单个元素的 PCollection,即您想要的列表。但是,它仍然不允许您将该列表作为普通 Python 对象访问。
    【解决方案2】:

    由于数据流管道的分布式特性,无法直接从 PCollection 访问数据。

    不要转换为列表,而是执行“组合”转换,将多个整数的 PCollection 转换为包含 SQL 查询中所需的单个列表元素的 PCollection,并使用之前的 PCollection 与 BigQuery 读取查询链接到另一个转换(整数列表)。

    【讨论】:

    • 你能通过修改上面的代码更清楚地说明“组合”变换吗?
    • Combine 是一种用于组合元素集合的转换。应用组合转换时,您必须提供包含用于组合元素或值的逻辑的函数。请注意,只有在这些值退出管道后,您才能使用它们。例如,this answer 提供了将 PCollection 转换为列表的示例,并将 SQL 查询写入文件,然后您可以使用该文件运行查询。
    猜你喜欢
    • 2018-05-14
    • 1970-01-01
    • 1970-01-01
    • 2019-12-31
    • 2013-02-19
    • 2015-11-27
    • 2012-04-04
    • 2017-06-09
    • 2021-02-14
    相关资源
    最近更新 更多