【问题标题】:Generate graph representation using PCollections in Apache Bean, Dataflow在 Apache Bean、Dataflow 中使用 PCollections 生成图形表示
【发布时间】:2020-06-24 18:19:57
【问题描述】:

我在以下架构中有一个数据源 - “人员 ID”、“地址 ID”。我已将数据加载到 PCollection 中。

每个人可以有多个地址ID,每个地址可以分配给多个人。我要查找的是所有相关人员或共享相同地址 ID 的人员,而不仅仅是在第一级到“n”级。

假设 P1 对 A1 和 A2 有一个边,P2 对 A2 和 A3 有一个边,P3 对 A3 和 A4 有一个边。在这种情况下,如果我绘制图表,我可以发现 P1 与 P2 相关,因为两者共享 A2。而且P1也与P3相关,因为P1通过A2与P2相关,而P2通过A3与P3有关系。

我的最终目标是找到这群有关系的人(通过地址 ID)。到目前为止,我所做的是尝试使用 Join.innerJoin 来形成临时表结构并将其循环到我们需要的任何级别。

PCollection<PeopleAddress> PA =  readEdges(); // 
PCollection<KV<String, PeopleAddress>> KAddressPA = transform(PA); // String is the address ID

PCollection<KV<String, KV<PeopleAddress, PeopleAddress>> data = Join.innerJoin(KAddressPA, KAddressPA);

//Above PCollection will give all first level edges, from this we will form a PeopleToPeople connection

PCollection<PeoplePeopleConnection> PP = getConnection(data); // From LHS and RHS we will read the ids and store. 

// With the new set of People People Connection we can  get new set of PeopleAddress edges..
Class PeoplePeopleConnection { String basePId; String cPId; }
Class PeopleAddress { String pId; String  aId; }

我正在考虑将上述代码循环 n 次以获得 N 级连接。但感觉有点过度劳累。很多边缘是重复的。我想知道有没有办法在 PCollection 中做到这一点。就像当我们找到与现有 Person 的地址的连接时,如何将其链接到现有的 person 对象。一些如何将新的 PeopleConnection 或 PeopleAddress 连接集合并回单个 PColleciton。

解决问题的不同视角?

【问题讨论】:

  • 听起来您希望将您的一组人分成连接的组件,对吗?给定节点 A 和 B,如果它们之间存在任何路径,则应将它们组合在一起。对吗?
  • 你的输出如何?类似人员 ID、人员 ID_2、级别?
  • @Pablo 是的,你说的是正确的。我想我们是否可以通过地址 ID 找到两个人之间的路径。我需要他们组合在一起
  • @rmesteves 实际上我并不关心级别,PeId1,PeId2 - 这是我所期待的。最后我可以做一个分组并找到PeId的所有关系。
  • 嗯,你知道两个节点之间的最大距离是多少吗?如果两个节点之间没有最大距离,则该算法将需要对单个输入进行多次迭代。 ||跟进:你的输出是什么样的?连接对的列表?例如(P1, A1), (P2, A1), (P3, A1) -> (P1, P2), (P2, P3), (P3, P1)?

标签: java google-cloud-dataflow apache-beam dataflow


【解决方案1】:

好的,到目前为止,您将有成对的人住在同一个地址,对吧?

PCollection<PeoplePeopleConnection> PP = getConnection(data);

这些对形成一个没有地址的图表 - 只有人,并且有distance=1。我喜欢这样,因为它可以让我们专注于人,而放弃地址。

那么,给定(P1, P2)(P2, P3) - 我们如何也能得到(P1, P3)

我们可以这样做:

PCollection<KV<String, String>> twoWayPairs = PP.apply(
      FlatMapElements(pair -> Lists.of(KV.of(pair.basePId, pair.cPId), 
                                       KV.of(pair.cPId, pair.basePId))));

然后,我们可以像以前一样加入他们:

PCollection<KV<String, Iterable<String>> groupedData = twoWayPairs
            .apply(GroupByKey.create());

给定 (P1, P2)(P2, P3) 作为输入,这将返回 (P2, [P1, P3])(P1, [P2])(P3, [P2])。从这对中,我们可以推导出(P1, P3) 作为distance=2 邻居的列表。

groupedData.apply(FlatMapElements((KV<String, Iterable<String>>) neighbors -> {
    List<KV<String, String>> newPairs = cartesianProduct(neighbors.getValue());
    if (newPairs.size() == 0) {
      return Lists.of(KV.of(neighbors.getKey(), neighbors.getValue().get(0)),
                      KV.of(neighbors.getValue().get(0), neighbors.getKey()));
    } else {
      return newPairs;
    }
  });

为什么要检查newPairs 是否为空?因为当 newPairs 元素为空时,我们会遇到不与其他元素链接的情况(例如前面的 (P1, [P2]))。

所以,最后,你应该能够做这样的事情:

// We get the distance=1 elements:
PCollection<KV<String, String>> twoWayPairs = PP.apply(
      FlatMapElements(pair -> Lists.of(KV.of(pair.basePId, pair.cPId), 
                                       KV.of(pair.cPId, pair.basePId))));

for(int i = 1; i < MAX_DISTANCE; i++) {
  twoWayPairs = twoWayPairs
      .apply(GroupByKey.create())
      .apply(FlatMapElements((KV<String, Iterable<String>>) neighbors -> {
          List<KV<String, String>> newPairs = cartesianProduct(neighbors.getValue());
          if (newPairs.size() == 0) {
              return Lists.of(KV.of(neighbors.getKey(), neighbors.getValue().get(0)),
                              KV.of(neighbors.getValue().get(0), neighbors.getKey()));
          } else {
              return newPairs;
    }
  });
}

这应该有助于生成具有distance&lt;N 的邻居。

考虑到在这种情况下,被洗牌的数据显着增加,所以在走很远的距离之前要小心。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-10-25
    • 1970-01-01
    • 2020-12-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多