【问题标题】:Apache Spark-SQL vs Sqoop benchmarking while transferring data from RDBMS to hdfsApache Spark-SQL vs Sqoop 基准测试,同时将数据从 RDBMS 传输到 hdfs
【发布时间】:2016-09-05 02:59:10
【问题描述】:

我正在研究一个必须将数据从 RDBMS 传输到 HDFS 的用例。我们使用 sqoop 对该案例进行了基准测试,发现我们能够在 6-7 分钟内传输大约 20GB 的数据。

当我尝试使用 Spark SQL 时,性能非常低(1 Gb 的记录从 netezza 传输到 hdfs 需要 4 分钟)。我正在尝试进行一些调整并提高其性能,但不太可能将其调整到 sqoop 级别(1 分钟内大约 3 Gb 的数据)。

我同意 spark 主要是一个处理引擎这一事实,但我的主要问题是 spark 和 sqoop 都在内部使用 JDBC 驱动程序,所以为什么在性能上有这么大的差异(或者我可能遗漏了一些东西)。我在这里发布我的代码。

object helloWorld {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Netezza_Connection").setMaster("local")
    val sc= new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC")
    val df2 =sqlContext.sql("select * from POC")
    val partitioner= new org.apache.spark.HashPartitioner(14)
    val rdd=df2.rdd.map(x=>(String.valueOf(x.get(1)),x)).partitionBy(partitioner).values
    rdd.saveAsTextFile("hdfs://Hostname/test")
  }
}

我查看了许多其他帖子,但无法得到关于 sqoop 内部工作和调整的明确答案,也没有得到 sqoop 与 spark sql 基准测试。请帮助理解这个问题。

【问题讨论】:

    标签: hadoop apache-spark-sql sqoop bigdata


    【解决方案1】:

    您可以尝试以下方法:-

    1. 从没有任何分区的 netezza 读取数据,并将 fetch_size 增加到一百万。

      sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("fetchSize","1000000").load().registerTempTable("POC")
      
    2. 在将数据写入最终文件之前重新分区。

      val df3 = df2.repartition(10) //to reduce the shuffle 
      
    3. ORC 格式比 TEXT 更优化。将最终输出写入 parquet/ORC。

      df3.write.format("ORC").save("hdfs://Hostname/test")
      

    【讨论】:

      【解决方案2】:

      你使用了错误的工具来完成这项工作。

      Sqoop 将启动一系列进程(在数据节点上),每个进程都会与您的数据库建立连接(请参阅 num-mapper),并且每个进程都会提取数据集的一部分。我认为您无法使用 Spark 实现某种读取并行性。

      使用 Sqoop 获取数据集,然后使用 Spark 对其进行处理。

      【讨论】:

      • 这仍然是真的吗?在摄取 RDBMS 数据方面,sqoop 是否仍然比 spark 好。
      • 我认为这是错误的。根据下面的链接,Spark 可以进行并行读取操作。 link
      【解决方案3】:

      @amitabh 尽管标记为答案,但我不同意。

      一旦你在从 jdbc 读取数据时给出了分区数据的谓词,spark 将为每个分区运行单独的任务。在您的情况下,任务数应为 14(您可以使用 spark UI 确认)。

      我注意到您使用本地作为主服务器,它只会为执行程序提供 1 个核心。因此不会有并行性。你的情况是这样的。

      现在要获得与 sqoop 相同的吞吐量,您需要确保这些任务并行运行。理论上,这可以通过以下方式完成: 1. 使用 14 个执行器,每个执行器 1 个核心 2. 使用 1 个 14 核的执行器(频谱的另一端)

      通常情况下,我会为每个执行程序配备 4-5 个内核。所以我用 15/5= 3 个执行器测试了性能(我添加了 1 到 14 个以考虑 1 个内核用于在集群模式下运行的驱动程序)。 使用:sparkConf.set 中的 executor.cores、executor.instances 来玩弄配置。

      如果这不会显着提高性能,那么接下来就是查看执行器内存。

      最后,我会调整应用程序逻辑以查看 mapRDD 大小、分区大小和 shuffle 大小。

      【讨论】:

      • :- 感谢您的 cmets。我在代码中将 master 指定为“本地”,因为我无法在此处发布我的公司纱线 URL。实际上我是在纱线集群上运行它.此外,在 hdfs 上写入数据而不是在读取时实现了 14 的并行度。读取时只有一个线程从 SQL db 读取,这使得整个过程非常缓慢。在这种情况下,我认为马可波罗的回答是正确的。这是我的看法。如果我遗漏了什么,请随时纠正我。谢谢。
      • 有多少执行者分配给该工作?您可以使用 spark UI 进行验证吗?
      【解决方案4】:

      以下解决方案对我有帮助

      var df=spark.read.format("jdbc").option("url","
      "url").option("user","user").option("password","password").option("dbTable","dbTable").option("fetchSize","10000").load()
      df.registerTempTable("tempTable")
      var dfRepart=spark.sql("select * from tempTable distribute by primary_key") //this will repartition the data evenly
      
      dfRepart.write.format("parquet").save("hdfs_location")
      

      【讨论】:

        【解决方案5】:

        我遇到了同样的问题,因为您使用的那段代码不适用于分区。

        sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC")
        

        您可以通过以下方式检查在您的 spark 作业中创建的分区数

        df.rdd.partitions.length
        

        您可以使用以下代码连接数据库:

        sqlContext.read.jdbc(url=db_url,
            table=tableName,
            columnName="ID",
            lowerBound=1L,
            upperBound=100000L,
            numPartitions=numPartitions,
            connectionProperties=connectionProperties) 
        

        要优化您的 Spark 作业,请使用以下参数: 1. 分区数 2. --num-executors 3.--执行器核心 4. --executor-内存 5.--驱动内存 6. 获取大小

        2、3、4 和 5 选项取决于您的集群配置 您可以在 spark ui 上监控您的 spark 作业。

        【讨论】:

          【解决方案6】:

          Sqoop 和 Spark SQL 都使用 JDBC 连接从 RDBMS 引擎获取数据,但 Sqoop 在这方面具有优势,因为它专门用于在 RDBMS 和 HDFS 之间迁移数据。

          Sqoop 中可用的每个选项都经过微调,以便在进行数据摄取时获得最佳性能。

          您可以从讨论控制映射器数量的选项 -m 开始。

          这是从 RDBMS 并行获取数据所需执行的操作。我可以在 Spark SQL 中执行此操作吗? 当然可以,但是开发人员需要处理 Sqoop 自动处理的“多线程”。

          【讨论】:

            猜你喜欢
            • 2014-12-25
            • 1970-01-01
            • 2020-09-04
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2012-05-18
            相关资源
            最近更新 更多