【问题标题】:Fast conditional join in SparkSpark 中的快速条件连接
【发布时间】:2016-05-25 20:01:57
【问题描述】:

我正在尝试使用条件查询在 Spark 中进行快速扩充。

我有两个 key/val 数据集:“Event Data”和“Session Map”。 “会话映射”用于找出谁在两个时间戳之间使用给定 IP。 “事件数据”是大量事件的集合,带有 IP 和时间戳,需要与“会话映射”相关联,以丰富用户名。

有没有一种有效的方法可以根据 Spark 中的 Session Map 或其他方式丰富事件数据?

会话图:

(IP, start_time, end_time) -> Name
(192.168.0.l, 2016-01-01 10:00:00, 2016-01-01 22:00:00) -> John
(192.168.0.l, 2016-01-01 22:00:01, 2016-01-02 04:35:00) -> Dana
(10.0.0.12,   2016-01-02 06:00:13, 2016-01-02 09:23:24) -> John
...

事件数据:

IP -> timestamp
192.168.0.l, 2016-01-01 10:00:00
192.168.0.l, 2016-01-01 10:00:01
192.168.0.l, 2016-01-01 10:00:02
192.168.0.l, 2016-01-01 10:05:23
...
192.168.0.l, 2016-01-01 22:00:01 
192.168.0.l, 2016-01-01 22:12:35 
192.168.0.l, 2016-01-01 04:12:00
...

【问题讨论】:

  • 这两个集合都适合内存吗?
  • 不幸的是,它不适合内存
  • 您尝试过常规加入吗?您使用什么 API 来处理您的数据?常规 RDD 还是 DataFrame?
  • 我相信 Spark 中的常规加入需要完全匹配。我错了吗?数据是常规 RDD。
  • 啊,我没有正确阅读您的问题。我有一个想法,我会发布它作为答案。快速提问,每个 IP 的会话/事件分布是什么?

标签: database hadoop join apache-spark mapreduce


【解决方案1】:

您可以做的是完全加入 IP 上的两个集合。这将生成一个非常大的表格,然后您可以对其进行过滤以仅保留事件落在会话范围内的组合。所以:

IP_RDD = (IP,(start_time, end_time, name))
Session_RDD = (IP, timestamp)
joined_RDD = IP_RDD.join(Session_RDD)
joined_RDD = joined_RDD.filter(end_time<=timestamp<=end_time)

这个伪代码应该可以做到这一点,您需要编写一个函数来检查时间戳并将其恢复为正确的格式。我不知道这是否足够快,但除非会话 start_time 和 end_time 在设定的时间(比如每 2 小时一个新会话,即),我没有看到更好的方法。

【讨论】:

  • 如果其中一个 IP 有很多,很多个条目——那会在连接中崩溃吗?
  • 我不确定,我认为由于过滤器,它不会保留那些不匹配过滤器的东西,它应该没问题。
【解决方案2】:

我觉得更简单的方法是先用zipWithIndex(降低join操作的成本):

 val SessionUnion = Session.zipWithIndex.map(x=>(x._1.IP,x._1.start_time,x._2)) //This should give you a RDD of IP,Date,Index
 val UnionEvent = Event.map(x=>(x.IP,x.timeStamp,0.toLong)).union(SessionUnion)

这基本上用你的所有会话和事件制作了一个平面表 - 但问题是会话有索引,而事件只有 0。

现在利用 RDD 中的排序

val sortAns = UnionEvent.map(x=>((x.IP,x.timeStamp),x._3)).sortBy(_._1)

这种排序应该为您对齐 IP 和日期范围,现在您需要做的就是调整索引:

 IP1,DateFromSession1,Index1
 IP1,DateFromEvent1,0
 IP1,DateFromEvent2,0
 IP1,DateFromSession2,Index2
 ...

所以你可以做一个Foldleft,这样所有为0的索引都被以前的索引替换,所以你最终得到IP1,DateFromEvent,IndexFromSession。或者我想还有其他一些有趣的方法可以做到这一点。

将索引附加到事件后,您现在可以使用索引进行连接,这应该非常快。

【讨论】:

  • 如何在 spark / Hadoop 中进行 foldLeft?
  • spark 也使用 Scala 库,左折叠在 Scala 库中,你应该可以使用它,但不推荐,因为它意味着收集整个 RDD 以掌握 - 然而,我只是提供一种替代方法。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-03-30
  • 2017-07-18
  • 2017-03-14
相关资源
最近更新 更多