【问题标题】:Approximate long-term sliding averages in Apache Spark StreamingApache Spark Streaming 中的近似长期滑动平均值
【发布时间】:2017-05-12 04:48:37
【问题描述】:

我想使用 Apache Spark Streaming 计算一周窗口大小的滑动平均值。结果应及时交付,即具有亚分钟延迟。在我的情况下,存储一周的流数据是不切实际的,尽管这需要计算准确的结果(如果我错了,请纠正我)。

因此,我的目标是某种近似。我的想法是使用 1h 翻转窗口来生成 1h pre-avg 结果流s_1。然后我会在s_1 上使用一个持续时间为一周的滑动平均值来计算 1 周预平均结果s_2。除了 s_1s_2 我还有另一个流 s_3 包含超过 1 小时的滑动平均结果。然后我会加入流s_2s_3,对于加入元组(t_2, t_3),我会发出(t_2 + t_3) / 2。对于每个平均元组,我还将附加包含的元组的最小和最大时间戳。我会使用这些时间戳来防止 s_2 和 s_3 元组重叠。例如:

s_2 tumbling window size 2 (tuples)
s_3 sliding window size 2, interval 1 (tuples)
stream 3  4  9  8  7     

time    s_2    2_3    out
1        -      3      3
2        -     3.5    3.5
3       3.5    6.5    3.5   the s_3 tuple 6.5 is ignored because  min_timestamp(6.5) <= max_timestamp(3.5)
4       3.5    8.5     6   (compute (3.5 + 8.5) / 2
5        6     7.5     6    the s_3 tuple 7.5 is ignored because  min_timestamp(7.5) <= max_timestamp(6)

虽然我能够让它与 Apache Storm 一起使用,但我不知道如何使用 Spark,因为由于滑动间隔不同,Spark 中不允许连接 s_2 和 s_3。

所以问题 #1 是:如何在 Spark Streaming 中实现这一点?

问题 #2:您能想出更好的方法来在流处理系统中有效地计算长时间内的滑动平均值吗?

【问题讨论】:

    标签: spark-streaming


    【解决方案1】:

    有许多策略可以在流上以相当高的准确度生成近似答案。我们使用的一种策略是stratified sampling 在 spark 的无限流上。 我们在一个名为 SnappyData 的开源项目中将分层采样引入 Apache Spark。好吧,Spark 中还缺少其他几项内容。 SnappyData 可以在流上保持统一的随机样本,但通过允许开发人员选择流中重要的列/维度来确保高精度。因此,例如,在您的示例中,确保每分钟或每小时捕获足够的样本。样本以 Spark Dataframe/Column 表的形式可见并且可查询。当执行像 avg/sum/count/etc 这样的聚合查询时,它会使用一堆算法来使用一小部分资源和时间来计算答案。

    这是 Snappydata 中的伪代码。

    Create sample table MyInfiniteStream on <Stream> options (qcs 'min(timestamp), fraction '0.01')
        // Of course, you can use the Dataframe api to do this instead of SQL too. 
        // your DStream <Stream> is registered with SnappyData
        // min(timestamp) tells which columns to use for stratification
        // fraction indicates what percentage of the input data to retain in the sample. 
    

    然后,您可以直接对此运行 Spark SQL 查询,无论是否有错误约束。现在的好处是您的时间间隔可以非常精细或粗略。

    select avg(myMeasureColumn), dimension d from MyInfiniteStream group by d with Error 0.1
    // this would ensure the result is always at least 90% accurate. 
    select avg(myMeasureColumn), dimension d from MyInfiniteStream where timestamp >x and timestamp < y group by d with Error 0.1
    

    您可以更好地理解here 的想法。 SnappyData 与 Spark 完全兼容。

    您也可以尝试直接在 Spark 中实现,尽管没有规定进行在线采样(即直接在流上),也没有内置算法来计算具有置信区间的错误。查看 Dataset 上的“示例”方法。

    【讨论】:

    • 除上述之外,可以使用“创建样本表”中的 timeInterval 选项来控制采样窗口(在此之后刷新当前集合),例如选项(qcs '...',分数 '0.01',timeInterval '120s')。还明确支持时间戳列,它可以代替挂钟用于 timeIntervals。这可以使用“timeSeriesColumn ''”选项设置为“创建示例表”,它应该是时间戳或日期类型。然后,您可以将“qcs”保留为您打算对其进行分组/过滤以获得更高准确性的列集。
    猜你喜欢
    • 2014-06-17
    • 1970-01-01
    • 1970-01-01
    • 2015-04-18
    • 2016-01-13
    • 2017-02-20
    • 2019-10-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多