【问题标题】:Spark : how to parallelize subsequent specific work on each dataframe partitionsSpark:如何并行化每个数据帧分区上的后续特定工作
【发布时间】:2018-10-01 18:13:58
【问题描述】:

我的 Spark 应用如下:

1) 使用 Spark SQL 对数据框“dataDF”执行大型查询

2)“dataDF”中涉及的foreach分区:

2.1) 获取关联的“过滤”数据帧,以便只有分区关联数据

2.2) 对“过滤”数据帧进行特定工作并写入输出

代码如下:

val dataSQL = spark.sql("SELECT ...")
val dataDF = dataSQL.repartition($"partition")

for {
  row <- dataDF.dropDuplicates("partition").collect
} yield {

   val partition_str : String = row.getAs[String](0)
   val filtered = dataDF.filter($"partition" .equalTo( lit( partition_str ) ) )

   // ... on each partition, do work depending on the partition, and write result on HDFS

   // Example :

   if( partition_str == "category_A" ){

       // do group by, do pivot, do mean, ...
       val x = filtered
         .groupBy("column1","column2")
         ...

       // write final DF
       x.write.parquet("some/path")

   } else if( partition_str == "category_B" ) {

       // select specific field and apply calculation on it
       val y = filtered.select(...)

       // write final DF
       x.write.parquet("some/path")

   } else if ( ... ) {

      // other kind of calculation
      // write results

   } else {

      // other kind of calculation
      // write results

   }

}

这样的算法可以成功。 Spark SQL 查询是完全分布式的。然而,在每个结果分区上完成的特定工作是按顺序完成的,结果效率低下,尤其是因为与分区相关的每个写入都是按顺序完成的。

在这种情况下,有什么方法可以将“for yield”替换为并行/异步的东西?

谢谢

【问题讨论】:

  • 您可以在 mapflatMap 函数参数中对工作人员进行操作。
  • 您好,正如我所说,我必须在生成的 spark sql 数据帧的每个分区上做特定的工作(应用特定的操作),从而导致不同的计算和输出模式取决于分区。我的目标是让不同的计算依赖于分区并行工作。我不明白如何用简单的地图做到这一点。你能详细说明一下吗?
  • 确实,'for' 表示顺序的,因此不是分布式的。不知道你为什么选择那个 - 可能需要阐明这个逻辑是什么。
  • 嘿。我添加了一些伪代码。我想要实现的是“只是”让 if 语句中的每个块并行运行。一种方式,每个最终逻辑应用于来自公共数据帧的每个特定分区并行运行。谢谢
  • 有一些关于在大数据中使用 .par 的帖子。

标签: scala apache-spark


【解决方案1】:
  1. 如果使用特定环境所需的特定逻辑写入 Hadoop 范围之外的数据存储,您可以使用 foreachPartition。

  2. 其他地图等

  3. .par 并行集合 (Scala) - 但要谨慎使用。用于读取文件并对其进行预处理,否则可能被认为是有风险的。

  4. 线程。

  5. 1234563执行者通过 SPARK 发送给工人。但是,例如,您不能按照下面的方式为工作人员编写 spark.sql - 最后由于我刚刚在文本块中得到的一些格式方面的错误。见文末。
  6. 同样 df.write 或 df.read 也不能在下面使用。您可以做的是将单独的执行/变异语句写入 ORACLE、mySQL。

希望这会有所帮助。

rdd.foreachPartition(iter => {
       while(iter.hasNext) {
         val item = iter.next()
         // do something
         spark.sql("INSERT INTO tableX VALUES(2,7, 'CORN', 100, item)")
         // do some other stuff
  })

RDD.foreachPartition (records => {       
  val JDBCDriver = "com.mysql.jdbc.Driver" ...
  ...
  connectionProperties.put("user", s"${jdbcUsername}")
  connectionProperties.put("password", s"${jdbcPassword}")
 val connection = DriverManager.getConnection(ConnectionURL, jdbcUsername, jdbcPassword)
  ...
  val mutateStatement = connection.createStatement()
  val queryStatement = connection.createStatement()
  ...
      records.foreach (record => { 
              val val1 = record._1
              val val2 = record._2
              ...
              mutateStatement.execute (s"insert into sample (k,v) values(${val1}, ${nIterVal})")      
            })
  }            
)   

【讨论】:

  • 谢谢。我确实更喜欢避免有风险的 PAR。您能否根据我的用例添加 foreachPartition 和 map 的示例?我不确定是否理解正确的方法。特别是我不明白它如何与地图一起使用,因为地图需要返回一个值。
  • 我不相信你的东西会像你上面展示的那样工作,它更像是 3GL 风格而不是 SPARK。让我知道您之前的情况以及您如何评价答案。成功,因为其中的一些东西并不总是那么容易,但你需要了解驱动程序和执行程序以及 spark.context 的 SPARK 方式。
  • 您的答案是质量。但是,在我的特定用例中,我不相信 map 或 foreachPartition 之类的方法会导致从迭代器中逐行读取,这变得难以使用。我不想每行将输出写入数据库行,而只是继续使用数据框拥有的设施,例如编写镶木地板文件。最后,如果对我来说 PAR 和 Threads 可能是我的最佳选择,那么您的答案中最相关的部分。不过,我更愿意在验证您的答案之前稍等片刻。
  • 这只是单独写入数据库的一个示例,同时表明 spark.sql 不能在执行程序中显式使用。这是关键点。我将在我的电脑上再举一个例子并发布。我来过这里,并且有一种方法可以遵循,Spark 的。
  • 好的,对于foreachPartition里面无法访问的spark.sql,我终于明白了:-) 谢谢!那么,如果我不想达到那么低的水平(必须逐行消耗东西),我有什么选择? PAR 和线程?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-10-19
  • 1970-01-01
  • 1970-01-01
  • 2017-04-03
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多