【问题标题】:Apache beam left join between 2 pcollectionsApache Beam 在 2 个 pcollections 之间左连接
【发布时间】:2021-11-22 15:33:36
【问题描述】:

我正在尝试在 pcollection 与其副本之间进行左连接,所以我正在寻找这样的东西:

((colA, colB, colC, colD))
(a,b,e,f)
(a,b,g,h)
(a,b,i,j)
(c,d,k,l)
(c,d,m,n)

在 colA 和 colB 上进行左连接,结果如下所示:

(e,f, g,h)
(e,f, i,j)
(g,h, i,j)

(k,l, m,n)

我来使用 apache Beam 数据框解决它:

df = to_dataframe(pcol)

with dataframe.allow_non_parallel_operations():
     res = df.merge(right=df, left_on=['colA', 'colB'], right_on=['colA', 'colB'])
pcoll = to_pcollection(res)

它工作正常,但是当我必须处理大行的 pcollection 时,出现内存不足错误(这是预期的)

现在我正在寻找 df.merger() 的替代方法,但使用 pcollection,所以我不会遇到内存错误

【问题讨论】:

    标签: python google-cloud-dataflow apache-beam


    【解决方案1】:

    如果有人对此问题感兴趣

    我想另一种逻辑。首先,我按键对记录进行分组,如下所示:

    ((a,b),(e,f))
    ((a,b),(g,h))
    ((a,b),(i,j))
    ((c,d),(k,l))
    ((c,d),(m,n))
    

    之后我使用GroupByKey 组合它们
    在下一个转换中我尝试循环抛出所有可能的组合

    class combineLev(beam.DoFn):
        #this act like df.merge
        def process(self, element):
            (k, v) = element
            v_ = list(v)
            for i in range(len(v_)):
                for j in range(i,len(v_)):
                    if v_[i][1] != v_[j][1]:
                        #print(list_[i][1], list_[j][1])
                        yield (v_[i], v_[j])
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-08-31
      相关资源
      最近更新 更多