【问题标题】:Apache Spark Moving AverageApache Spark 移动平均线
【发布时间】:2014-06-17 14:42:33
【问题描述】:

我在 HDFS 中有一个巨大的文件,其中包含时间序列数据点(雅虎股票价格)。

我想找到时间序列的移动平均线,我该如何编写 Apache Spark 作业来做到这一点。

【问题讨论】:

  • 移动平均对于 Spark 和任何分布式系统来说都是一个棘手的问题。当数据分布在多台机器上时,会有一些跨分区的时间窗口。我认为关键是在分区的开头和结尾复制数据点。我会想办法在 Spark 中做到这一点。
  • 感谢@DanielDarabos
  • 为什么不能通过遍历RDD来实现呢?这会按顺序返回分区。
  • 这与 @Arwind 的答案相同,但用 Java 编写:stackoverflow.com/questions/31965615/…
  • @Victor 那是一年后的事了!!

标签: time-series hdfs moving-average apache-spark


【解决方案1】:

Spark 1.4 introduced windowing functions,这意味着您可以按如下方式进行移动平均使用 rowsBetween 调整窗口

val schema = Seq("id", "cykle", "value")
 val data = Seq(
        (1, 1, 1),
        (1, 2, 11),
        (1, 3, 1),
        (1, 4, 11),
        (1, 5, 1),
        (1, 6, 11),
        (2, 1, 1),
        (2, 2, 11),
        (2, 3, 1),
        (2, 4, 11),
        (2, 5, 1),
        (2, 6, 11)
      )

val dft = sc.parallelize(data).toDF(schema: _*)

dft.select('*).show

// PARTITION BY id  ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5)
val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(-2, 2)

val x = dft.select($"id",$"cykle",avg($"value").over(w))
x.show

输出(齐柏林飞艇):

schema: Seq[String] = List(id, cykle, value)
data: Seq[(Int, Int, Int)] = List((1,1,1), (1,2,11), (1,3,1), (1,4,11), (1,5,1), (1,6,11), (2,1,1), (2,2,11), (2,3,1), (2,4,11), (2,5,1), (2,6,11))
dft: org.apache.spark.sql.DataFrame = [id: int, cykle: int, value: int]
+---+-----+-----+
| id|cykle|value|
+---+-----+-----+
|  1|    1|    1|
|  1|    2|   11|
|  1|    3|    1|
|  1|    4|   11|
|  1|    5|    1|
|  1|    6|   11|
|  2|    1|    1|
|  2|    2|   11|
|  2|    3|    1|
|  2|    4|   11|
|  2|    5|    1|
|  2|    6|   11|
+---+-----+-----+
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@55cd666f
x: org.apache.spark.sql.DataFrame = [id: int, cykle: int, 'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: double]
+---+-----+-------------------------------------------------------------------------+
| id|cykle|'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING|
+---+-----+-------------------------------------------------------------------------+
|  1|    1|                                                        4.333333333333333|
|  1|    2|                                                                      6.0|
|  1|    3|                                                                      5.0|
|  1|    4|                                                                      7.0|
|  1|    5|                                                                      6.0|
|  1|    6|                                                        7.666666666666667|
|  2|    1|                                                        4.333333333333333|
|  2|    2|                                                                      6.0|
|  2|    3|                                                                      5.0|
|  2|    4|                                                                      7.0|
|  2|    5|                                                                      6.0|
|  2|    6|                                                        7.666666666666667|
+---+-----+————————————————————————————————————+

【讨论】:

  • 也可以看看这篇博文:xinhstechblog.blogspot.de/2016/04/… 比官方公布的窗口函数工作原理更实用的解释。
  • 如果您没有任何可以划分的依据,也就是说,如果您需要对所有数据执行移动平均,会发生什么?这是我的情况,因为我有时间序列数据并且没有什么可以划分的。在这种情况下,所有数据都将被移动到一个节点,这是一个问题,对吧?如何克服这个问题?
  • @Marko 数据是什么?看看近似的 quarties 和 spark-ts databricks.com/blog/2016/05/19/… github.com/sryza/spark-timeseries
  • 感谢一年后的回答 :) 数据代表多元时间序列。也就是说,每一列都是在时间期间测量的参数。不确定如何近似可以帮助我处理移动平均线,我会避免使用这个库,因为它是第三方并且不再开发。也许还有其他想法?我害怕的问题真的存在吗?如果我没有要分区的内容,我会在一个节点上获取所有数据吗?
  • 我认为使用了默认分区器stackoverflow.com/questions/34491219/…
【解决方案2】:

您可以使用 MLLIB 中的滑动功能,这可能与丹尼尔的回答相同。在使用滑动功能之前,您必须按时间对数据进行排序。

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()

【讨论】:

  • 太棒了!这和我的回答不完全一样。它从每个分区中获取第一个(window-1)元素,并使用这少量数据来填补空白。 (code)
  • It Map Reduce,我们需要一个自定义 InputFormat,它从下一个拆分中读取一些额外的行来读取完整的窗口,就像 TextInputFormat 从下一个拆分中读取一些附加内容一样。
  • MapMethod 可以继续维护一个寡妇大小的值列表。也就是说,直到没有达到大小,继续累积到列表中。达到大小后,计算平均值并执行 context.write()。在 Next Map() 方法调用中,将新值添加到列表中,从列表中删除最旧的值并计算平均值并执行 context.write()。 SPARK,不控制在任务中累积值,并管理其计数等
  • .sliding(3).map(curSlice => (curSlice.sum / curSlice.size)) 看起来很简单。 curSlice 的数据类型是什么。如果值不是数字而是文本,并且我们需要在窗口中查找最常用的单词,我们是否可以让 curSlice 支持所有数据类型。 ? @Arvind ?
【解决方案3】:

移动平均对于 Spark 和任何分布式系统来说都是一个棘手的问题。当数据分布在多台机器上时,会有一些跨分区的时间窗口。我们必须在分区开始时复制数据,这样计算每个分区的移动平均值才能得到完整的覆盖。

这是在 Spark 中执行此操作的一种方法。示例数据:

val ts = sc.parallelize(0 to 100, 10)
val window = 3

一个简单的分区器,将每一行放入我们通过键指定的分区中:

class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
  def numPartitions = p
  def getPartition(key: Any) = key.asInstanceOf[Int]
}

使用复制到前一个分区的第一行 window - 1 创建数据:

val partitioned = ts.mapPartitionsWithIndex((i, p) => {
  val overlap = p.take(window - 1).toArray
  val spill = overlap.iterator.map((i - 1, _))
  val keep = (overlap.iterator ++ p).map((i, _))
  if (i == 0) keep else keep ++ spill
}).partitionBy(new StraightPartitioner(ts.partitions.length)).values

只需计算每个分区的移动平均值:

val movingAverage = partitioned.mapPartitions(p => {
  val sorted = p.toSeq.sorted
  val olds = sorted.iterator
  val news = sorted.iterator
  var sum = news.take(window - 1).sum
  (olds zip news).map({ case (o, n) => {
    sum += n
    val v = sum
    sum -= o
    v
  }})
})

由于重复的片段,这将不会有覆盖范围的差距。

scala> movingAverage.collect.sameElements(3 to 297 by 3)
res0: Boolean = true

【讨论】:

  • 最后一步的排序可能是不必要的。无论如何,数据似乎都是排序的。我不知道是否可以保证重新分区以这种方式运行。
  • 为什么不能通过遍历RDD来实现呢?这会按顺序返回分区......然后你只需要复制RDD末端的部分。我想知道 updateStateByKey 是否有助于让事情变得更容易。
  • 这是一种有趣的方法,但您做出了一个冒险的假设,即没有空 / 到短分区。例如:val m = Map(1 -> (0 to 50).toIterator, 4 -> (51 to 100).toIterator).withDefault(i => Iterator()); val ts = sc.parallelize(Seq.empty[Int], 10).mapPartitionsWithIndex((i, _) => m(i))
  • 我使用类似于 herehere 的广播变量而不是分区器,并根据计数分配数据。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-09-01
  • 2013-12-22
  • 1970-01-01
  • 2022-01-26
  • 2011-06-29
  • 2019-04-24
  • 2012-05-24
相关资源
最近更新 更多