【问题标题】:Spark SQL + Streaming issuesSpark SQL + 流问题
【发布时间】:2015-08-10 17:36:05
【问题描述】:

我们正在尝试使用 Spark StreamingSpark SQL 实现一个用例,允许我们针对某些数据运行用户定义的规则(请参阅下文,了解数据如何被捕获和使用)。这个想法是使用 SQL 来指定规则并将结果作为警报返回给用户。基于每个传入事件批次执行查询似乎非常慢。如果有人能提出更好的方法来实现这个用例,我将不胜感激。另外,想知道 Spark 是在驱动程序还是工作程序上执行 sql?提前致谢。以下是我们为实现这一目标而执行的步骤 -

1) 从外部数据库加载初始数据集作为 JDBCRDD

JDBCRDD<SomeState> initialRDD = JDBCRDD.create(...);

2) 创建一个传入的 DStream(捕获对初始化数据的更新)

JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
            FlumeUtils.createStream(ssc, flumeAgentHost, flumeAgentPort);

JavaDStream<SomeState> incomingDStream = flumeStream.map(...);

3) 使用传入的 DStream 创建 Pair DStream

JavaPairDStream<Object,SomeState> pairDStream =
            incomingDStream.map(...);

4) 使用初始化的 RDD 作为基础状态,从 pair DStream 创建一个有状态的 DStream

JavaPairDStream<Object,SomeState> statefulDStream = pairDStream.updateStateByKey(...);

JavaRDD<SomeState> updatedStateRDD = statefulDStream.map(...);

5) 根据传入流中的值对更新后的状态运行用户驱动的查询

incomingStream.foreachRDD(new Function<JavaRDD<SomeState>,Void>() {

            @Override
            public Void call(JavaRDD<SomeState> events) throws Exception { 

                updatedStateRDD.count();
                SQLContext sqx = new SQLContext(events.context());
                schemaDf = sqx.createDataFrame(updatedStateRDD, SomeState.class);
                schemaDf.registerTempTable("TEMP_TABLE");
                sqx.sql(SELECT col1 from TEMP_TABLE where <condition1> and <condition2> ...);

                //collect the results and process and send alerts
                ...

            }
);

【问题讨论】:

    标签: apache-spark spark-streaming apache-spark-sql apache-spark-1.4


    【解决方案1】:

    第一步应该是确定哪个步骤花费的时间最多。 请查看 Spark Master 用户界面并确定哪个步骤/阶段花费的时间最多。

    您可以考虑以下几个最佳实践 + 我的观察结果:-

    1. 使用单例 SQLContext - 参见示例 - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
    2. updateStateByKey 在有大量键的情况下可能是内存密集型操作。您需要检查由 updateStateByKey 函数以及它是否适合给定 记忆。
    3. 您的 GC 表现如何?
    4. 您真的在使用“initialRDD”吗?如果没有,则不要加载它。如果它是静态数据集,则将其缓存。
    5. 还要检查 SQL 查询所用的时间。

    这里还有一些可以帮助您的问题/领域

    1. 什么是 DStream 的 StorageLevel?
    2. 集群大小和集群配置
    3. Spark 版本?

    最后 - ForEachRDD 是一个输出操作,它在 Driver 上执行给定的功能,但 RDD 可能会执行操作,并且这些操作会在工作节点上执行。

    您可能需要阅读这篇文章以获得关于输出操作的更好解释 - http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

    【讨论】:

      【解决方案2】:

      我也面临同样的问题,如果您有相同的解决方案,请告诉我?虽然我在下面的帖子中提到了详细的用例。

      Spark SQL + Window + Streming Issue - Spark SQL query is taking long to execute when running with spark streaming

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2014-11-12
        • 2018-10-14
        • 2023-03-03
        • 1970-01-01
        • 1970-01-01
        • 2020-12-29
        • 2015-09-27
        • 1970-01-01
        相关资源
        最近更新 更多