【问题标题】:Distributed Computation for large data-data processing用于大数据数据处理的分布式计算
【发布时间】:2018-07-19 13:12:40
【问题描述】:

我有大量的时间序列数据,我想使用 spark 的并行处理/分布式计算进行数据处理。 要求是逐行查看数据以确定下面在所需结果部分下指定的组,如果没有执行者之间的某种协调,我真的无法得到火花来分发它

t- timeseries datetime sample,
lat-latitude,
long-longitude


例如:取一小部分样本数据集来解释案例

t   lat long
0   27  28
5   27  28
10  27  28
15  29  49
20  29  49
25  27  28
30  27  28 

期望的输出应该是:

Lat-long    interval
(27,28) (0,10)
(29,49) (15,20)
(27,28) (25,30)

我可以使用这段代码得到想要的结果

val spark = SparkSession.builder().master("local").getOrCreate()

import spark.implicits._

 val df = Seq(
  (0, 27,28),
  (5, 27,28),
  (10, 27,28),
  (15, 26,49),
  (20, 26,49),
  (25, 27,28),
  (30, 27,28)
).toDF("t", "lat","long")

val dfGrouped = df
.withColumn("lat-long", struct($"lat", $"long"))

val wAll = Window.partitionBy().orderBy($"t".asc)

dfGrouped.withColumn("lag", lag("lat-long", 1, null).over(wAll))
.orderBy(asc("t")).withColumn("detector", when($"lat-long" === $"lag", 0)
    .otherwise(1)).withColumn("runningTotal", sum("detector").over(wAll))
.groupBy("runningTotal", "lat-long").agg(struct(min("t"), max("t")).as("interval"))
.drop("runningTotal").show
}

但是如果数据进入两个执行器,那么数据会是什么样子

执行器 1 中的数据:

t   lat long
0   27  28
5   27  28
10  27  28
15  29  49
20  29  49
25  27  28

执行器 2 中的数据:

t   lat long
30   27  28


我应该如何为大量数据获得所需的输出。必须有更聪明的方法来做到这一点,通过执行者之间的某种协调来分配它以获得该结果。

请指导我正确的方向,我已经研究过相同的但无法找到解决方案。

PS:这只是一个示例。

【问题讨论】:

  • 你能用多台机器创建一个集群吗?然后数据集可以作为一个数据集可见,操作起来会更容易。
  • @wind 你能详细说明一下吗。
  • 由于我不理解您的描述,您有 2 个单独的数据集群(HDFS 或其他),并且您在连接这些数据时遇到问题。我说的对吗?
  • 不,这就像我在 cassandra 中有一个时间序列数据,我需要做一些并行处理,以便它何时分布在不同的分区中,然后处理应用于不同的分区,但最终我的目标是得到上述结果。
  • 示例示例是指我发布的数据@user8371915 我发布代码只是为了明确我想要什么样的结果,因为有时从解释中并不清楚操作员真正想要什么!我的目标是使用并行处理功能并获得所需的结果,如果在集群节点上运行,我无法通过上述代码获得。

标签: scala apache-spark distributed-computing


【解决方案1】:

您可以使用 UDAF 解决此问题。 首先,您可以添加一列,它代表您拥有的多个执行程序中分区的 t 列。类似 executorIndex = t % ((max(t) - min(t)) / numExecutors)。

然后您可以通过 executorIndex 应用您的 UDAF 分组。

您的 UDAF 需要存储一个带有字符串键(例如)的 Map,该键表示一个 lat 和 long 对,以及一个表示该 lat-long 键的 maxT 和 minT 的 int[]。

如果您需要更详细的解释,请询问。

希望对您有所帮助...

PS:我认为相同的经纬度之间存在一些时间关系,如果您正在跟踪某些运动,这是正常的......

【讨论】:

  • 感谢您的努力,但请提供更广泛的解释。
猜你喜欢
  • 1970-01-01
  • 2017-11-22
  • 1970-01-01
  • 2014-03-25
  • 2015-08-08
  • 1970-01-01
  • 1970-01-01
  • 2018-10-01
  • 1970-01-01
相关资源
最近更新 更多