【问题标题】:How to run batch hive queries in spark scala如何在 spark scala 中运行批处理配置单元查询
【发布时间】:2021-11-17 03:28:29
【问题描述】:

我正在使用以下代码从我的 spark 作业中循环执行多个 hive 查询

implicit val sparkSession = SparkSession
      .builder()
      .config(sparkConf)
      .enableHiveSupport()
      .getOrCreate()

 val bizEventFolders = fs.listStatus(outputPath)
    bizEventFolders.foreach(folder => {
      val filePath = folder.getPath().toString
      if(filePath.contains(outputDir+"biz_evnt_key=")){
        val bizEventKey = filePath.replaceAll(outputDir+"biz_evnt_key=","")
        val addPartitionHiveQuery = s"alter table $tableName add if not exists partition (year=$year, month=$month, day=$day, hr=$hour,biz_evnt_key=$bizEventKey) location '${outputDir}biz_evnt_key=$bizEventKey'"
        sparkSession.sql(addPartitionHiveQuery)
        logger.info(s"successfully ran add partition hive query $addPartitionHiveQuery")
      }
    })

问题是,我必须一个接一个地运行多个这样的查询来将所有分区添加到 HIVE 表中,我怎样才能一次提交所有查询来触发而不是一个一个地触发它们?

【问题讨论】:

    标签: scala apache-spark hive apache-spark-sql


    【解决方案1】:

    您可以使用Scala Futures 或其他并行 AP 并行运行这些查询。

    一个简单的解决方案可能是使用par

    bizEventFolders.par.foreach(folder => {
          val filePath = folder.getPath().toString
          if(filePath.contains(outputDir+"biz_evnt_key=")){
            val bizEventKey = filePath.replaceAll(outputDir+"biz_evnt_key=","")
            val addPartitionHiveQuery = s"alter table $tableName add if not exists partition (year=$year, month=$month, day=$day, hr=$hour,biz_evnt_key=$bizEventKey) location '${outputDir}biz_evnt_key=$bizEventKey'"
            sparkSession.sql(addPartitionHiveQuery)
            logger.info(s"successfully ran add partition hive query $addPartitionHiveQuery")
          }
        })
    

    通过在顺序集合(例如列表)上调用par 方法,它变成了一个并行集合,并且可以以与顺序集合相同的方式使用它。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-04-30
      • 2015-04-12
      • 1970-01-01
      • 2023-03-18
      • 2020-06-30
      • 1970-01-01
      相关资源
      最近更新 更多