【问题标题】:How to run two SparkSql queries in parallel in Apache Spark如何在 Apache Spark 中并行运行两个 SparkSql 查询
【发布时间】:2016-03-17 21:53:40
【问题描述】:

首先,让我在spark上的.scala文件中编写我要执行的部分代码。

这是我的源文件。它有四个字段的结构化数据

val inputFile = sc.textFile("hdfs://Hadoop1:9000/user/hduser/test.csv")

我已经声明了一个案例类来将文件中的数据存储到具有四列的表中

case class Table1(srcIp: String, destIp: String, srcPrt: Int, destPrt: Int)

val inputValue = inputFile.map(_.split(",")).map(p => Table1(p(0),p(1),p(2).trim.toInt,p(3).trim.toInt)).toDF()

inputValue.registerTempTable("inputValue")

现在,假设我想运行以下两个查询。我如何并行运行这些查询,因为它们是相互独立的。我觉得,如果我可以并行运行它们,它可以减少执行时间。现在,它们是串行执行的。

val primaryDestValues = sqlContext.sql("SELECT distinct destIp FROM inputValue")
primaryDestValues.registerTempTable("primaryDestValues")
val primarySrcValues = sqlContext.sql("SELECT distinct srcIp FROM inputValue")
primarySrcValues.registerTempTable("primarySrcValues")

primaryDestValues.join(primarySrcValues, $"destIp" === $"srcIp").select($"destIp",$"srcIp").show(

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    也许你可以看看期货/承诺的方向。 SparkContext submitJob 中有一个方法可以返回你未来的结果。因此,您可以解雇两个工作,然后从期货中收集结果。

    我还没有尝试过这种方法。只是一个假设。

    【讨论】:

    • 我已经尝试过,但无法从未来获取返回值。会尝试更多地工作,如果上帝愿意,成功,会回应。
    • @Ahmad 在解雇两个或更多工作并收到 Futures 后,您想要做的可能是获得这些工作的汇总未来或映射结果。最正确的方法是在结果链上为成功和失败案例定义处理程序,或者您可以只映射结果或使用。有关加入期货的更多详情,请查看this thread
    【解决方案2】:

    不知道为什么要首先使用 sqlContext,并且不要让事情变得简单。

    val inputValue = inputFile.map(_.split(",")).map(p => (p(0),p(1),p(2).trim.toInt,p(3).trim.toInt))
    

    假设 p(0) = destIp, p(1)=srcIp

    val joinedValue = inputValue.map{case(destIp, srcIp, x, y) => (destIp, (x, y))}
                      .join(inputFile.map{case(destIp, srcIp, x, y) => (srcIp, (x, y))})
                      .map{case(ip, (x1, y1), (x2, y2)) => (ip, destX, destY, srcX, srcY)}
    

    现在它会被并行化,你甚至可以使用 colasce 控制你想要的分区数量

    【讨论】:

    • 我使用 sql 上下文是因为,我有许多复杂的查询来计算中位数、平均值等。所以,我认为使用 SQL 更容易实现。问题中的两个查询是许多查询的一部分。
    • 我不建议这样做,除非绝对必要。思路很简单,当你使用 SQL 时,你为 spark 处理添加了额外的层,这是开销。无论如何,来回答你的问题。 Spark 一次只能执行一项任务,当您到达案例展示中的触发函数时,该任务及其所有链式操作都会被调用。我认为您的查询未并行运行的核心原因是 sqlContext 是线程安全的。
    【解决方案3】:

    你可以跳过这两个DISTINCT,在最后做一个:

    inputValue.select($"srcIp").join(
      inputValue.select($"destIp"), 
      $"srcIp" === $"destIp"
    ).distinct().show
    

    【讨论】:

    • 感谢您的回答,但这只是一个示例。我需要知道并行执行查询的方法。
    • 你真的不能。你可以加入他们,仅此而已。那是你的选择。以一种或另一种方式加入。
    • 我已经阅读了“Future”方法,但我没有正确理解。我认为它可以用来并行运行查询。
    【解决方案4】:

    这是一个很好的问题。这可以使用数组中的par 并行执行。为此,您已相应地自定义您的代码。

    声明一个包含两个项目的数组(您可以根据自己的意愿命名)。在需要并行执行的每个 case 语句中编写代码。

    Array("destIp","srcIp").par.foreach { i => 
    {
        i match {
          case "destIp" => {
            val primaryDestValues = sqlContext.sql("SELECT distinct destIp FROM inputValue")
            primaryDestValues.registerTempTable("primaryDestValues")
          }
          case "srcIp" => {
            val primarySrcValues = sqlContext.sql("SELECT distinct srcIp FROM inputValue")
            primarySrcValues.registerTempTable("primarySrcValues")
          }}}
    }
    

    一旦两个case语句的执行完成,你下面的代码就会被执行。

    primaryDestValues.join(primarySrcValues, $"destIp" === $"srcIp").select($"destIp",$"srcIp").show()
    

    注意:如果您从代码中删除par,它将按顺序运行

    另一个选项是在代码中创建另一个 sparksession 并使用该 sparksession 变量执行 sql。不过这个风险不大,已经用得很小心了

    【讨论】:

      猜你喜欢
      • 2018-04-29
      • 1970-01-01
      • 1970-01-01
      • 2017-04-12
      • 1970-01-01
      • 2019-11-13
      • 1970-01-01
      • 2014-10-14
      • 1970-01-01
      相关资源
      最近更新 更多