【问题标题】:How to compare all the values of two keys with in same PCollection in python?python - 如何比较两个键的所有值与python中相同的PCollection?
【发布时间】:2020-02-14 16:58:17
【问题描述】:

我是 Apache Beam/数据流的新手。我正在阅读 Apache Beam 中的 BigQuery 表,我想按两个不同的列进行分组,并比较两个不同键的所有值。我创建了一个包含两个不同列(ID、Date)的元组,它们充当键。以下是表格中的示例数据

  ID         Date        P_id    position
  "abc"    2019-08-01   "rt56"      5
  "abc"    2019-08-01   "rt57"      6
  "abc"    2019-08-01   "rt58"      7
  "abc"    2019-08-02   "rt56"      2 
  "abc"    2019-08-02   "rt57"      4
  "abc"    2019-08-02   "rt58"      7

现在我想比较 P_id 对 ("abc", 2019-08-01) 和 ("abc", 2019-08-02) 的位置,看看是否有任何 P_id 位置发生变化然后添加表“状态”中的另一列为 True。所以我的新表应该如下所示

我正在用下面的代码尝试它

  ID         Date        P_id    position  Status
  "abc"    2019-08-01   "rt56"      5       False (as this is first date)
  "abc"    2019-08-01   "rt57"      6
  "abc"    2019-08-01   "rt58"      7
  "abc"    2019-08-02   "rt56"      2       True
  "abc"    2019-08-02   "rt57"      4
  "abc"    2019-08-02   "rt58"      7
(
p 
| "get_key_tuple" >> beam.ParDo(lambda element: tuple(element["Id"], element["Date]))
| "group_by" >> beam.GroupByKey()
| "compare_and_add_status" >> beam.ParDo(compare_pos)
)

但是我不知道我应该如何处理函数 compare_pos()

考虑到我有一个非常大的表和很多 ID,如果我能有效地比较位置并创建一个新列来了解状态,这将非常有帮助。

【问题讨论】:

    标签: python google-cloud-dataflow apache-beam


    【解决方案1】:

    Beam 的 GroupByKey 采用 2 元组的 PCollection 并返回一个 PCollection,其中每个元素都是键的 2 元组和与该键关联的所有值的(无序)可迭代。例如,如果您的原始集合包含元素

    (k1, v1)
    (k1, v2)
    (k1, v3)
    (k2, v4)
    

    GroupByKey 的结果将是一个带有类似元素的 PCollection

    (k1, [v1, v3, v2])
    (k2, [v4])
    

    在您的情况下,您的键和值本身就是元组。因此,您可以获取原始集合并应用 Map(lambda elt: ((elt['Id'], elt['Date']), (elt['P_id'], elt['position']))),这将为您提供包含元素的 PCollection

      ("abc", 2019-08-01),   ("rt56", 5)
      ("abc", 2019-08-01),   ("rt57", 6)
      ("abc", 2019-08-01),   ("rt58", 7)
      ("abc", 2019-08-02),   ("rt56", 2)
      ("abc", 2019-08-02),   ("rt57", 4)
      ("abc", 2019-08-02),   ("rt58", 7)
    

    应用 GroupByKey 后会变成

      ("abc", 2019-08-01),   [("rt56", 5), ("rt57", 6), ("rt58", 7)]
      ("abc", 2019-08-02),   [("rt56", 2), ("rt57", 4), ("rt58", 7)]
    

    此时,您的 compare_pos 函数可以检查与给定 ID, Date 对对应的所有 P_id, position 元组,并执行所需的任何逻辑来发出需要更改的内容(使用其对应的键)。

    【讨论】:

    • 感谢您的帮助。但这并不能完全回答我的问题。我被困在 Python 中实现 compare_pos 函数(即我如何比较 PCollection 中的元组以检查位置是否改变),这是我最初在问题中提出的。你能帮忙吗?
    • 我想我可能误解了你的问题。您实际上是在询问,在给定的 ID 内,与数据关联的一组 P_id 是否从一个日期更改为下一个日期? (不确定位置列在哪里适合。)如果是这样,我将应用 Map(lambda element: (element['ID']: element)) 和 GBK 它将为您提供单个 ID 的所有元素,然后在 compare_pos 中,您可以进行进一步的分组(例如,使用 defaultdict(set) 将 Dates 映射到 P_ids,然后遍历排序的键以查看迄今为止发生的变化。)
    • 另外,如果您知道日期中没有间隔(并且有太多的日期-P_id 对无法放入单个 ID 的内存中),您可以生成两个表,一个带有 (ID, Date) 作为键,另一个以 (ID, Date+1) 作为键,并执行 CoGropuByKey,这将产生一个 PCollection,其中每个连续日期对都有一个元素。
    【解决方案2】:

    我可能对 OP 的解释有误,但如果 @robertwb 的建议不起作用,请尝试按以下方式分组:

    | "Create k, v tuple" >> beam.Map(
                        lambda elem: ((elem["P_id"], elem["ID"]), [elem["Date"], elem["position"]]))
    | "Group by key" >> beam.GroupByKey()
    

    这将输出以下结构:

    (('rt56', 'abc'), [['2019-08-01', 5], ['2019-08-02', 2]])
    (('rt57', 'abc'), [['2019-08-01', 6], ['2019-08-02', 4]])
    (('rt58', 'abc'), [['2019-08-01', 7], ['2019-08-02', 7]])
    

    这应该允许您单独比较生成的 PCollection 中的每个元素,而不是交叉比较 PCollection 中的元素。如果我是正确的,这应该更适合 Beam 的执行模型。

    这是基于我的假设,即您要检查给定 P_id 的位置是否在两个日期之间发生了变化。

    【讨论】:

    • 是的,看起来不错,但是我如何比较它们之间的位置,因为当我做 element[1] 时,它会将所有三行放在一列中,使其成为单个元组
    猜你喜欢
    • 2022-11-29
    • 1970-01-01
    • 1970-01-01
    • 2016-06-22
    • 2020-12-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-19
    相关资源
    最近更新 更多