【问题标题】:Performance issue in spark javaspark java中的性能问题
【发布时间】:2018-05-04 18:29:14
【问题描述】:

我使用的是 spark 2.11 版本,我在我的应用程序中只执行了 3 个基本操作:

  1. 从数据库中获取记录:220 万条
  2. 使用包含检查数据库 (220 万) 中存在的文件 (5000) 中的记录
  3. 将匹配的记录写入 CSV 格式的文件

但是对于这 3 项操作,大约需要 20 分钟。如果我在 SQL 中做同样的操作,不到 1 分钟。

我已经开始使用 spark,因为它会很快产生结果,但它需要太多时间。如何提高性能?

第 1 步:从数据库中获取记录。

        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "test");
        connectionProperties.put("password", "test##");
        String query="(SELECT * from items)
        dataFileContent= spark.read().jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", query,connectionProperties);

Step2:使用 contains 检查文件 B (2M) 中存在的文件 A (5k) 的记录

Dataset<Row> NewSet=source.join(target,target.col("ItemIDTarget").contains(source.col("ItemIDSource")),"inner");

Step3:将匹配的记录写入CSV格式的文件

 NewSet.repartition(1).select("*")
        .write().format("com.databricks.spark.csv")
        .option("delimiter", ",")
        .option("header", "true")
        .option("treatEmptyValuesAsNulls", "true")  
        .option("nullValue", "")  
        .save(fileAbsolutePath);

为了提高性能,我尝试了几种设置缓存, 数据序列化

set("spark.serializer","org.apache.spark.serializer.KryoSerializer")),

洗牌时间

sqlContext.setConf("spark.sql.shuffle.partitions", "10"),

数据结构调优

-XX:+UseCompressedOops ,

没有一种方法不会产生更好的性能。

【问题讨论】:

  • 这个用例是否有理由使用 Spark?在我看来,将 5k 条记录写入数据库并在数据库中发出 SQL 连接将是最有效的方法。
  • 我的意思是,将这个查询具体化到 Spark 中需要多长时间:SELECT * from items)?

标签: java performance apache-spark apache-spark-sql


【解决方案1】:

提高性能更像是提高并行度。

并行度取决于 RDD 中的分区数。

确保 Dataset/Dataframe/RDD 的分区数既不太多也不少。

请查看以下可以改进代码的建议。我对 scala 更满意,所以我在 scala 中提供建议。

第一步: 通过提及 numPartitions,确保您可以控制与数据库建立的连接。

连接数 = 分区数。

下面我只是将 10 分配给 num_partitions,你必须调整以获得更高的性能。

  int num_partitions;
  num_partitions = 10;
  Properties connectionProperties = new Properties();
  connectionProperties.put("user", "test");
  connectionProperties.put("password", "test##");
  connectionProperties.put("partitionColumn", "hash_code");
  String query = "(SELECT  mod(A.id,num_partitions)  as hash_code, A.* from items A)";
  dataFileContent = spark.read()
    .jdbc("jdbc:oracle:thin:@//172.20.0.11/devad",
      dbtable = query,
      columnName = "hash_code",
      lowerBound = 0,
      upperBound = num_partitions,
      numPartitions = num_partitions,
      connectionProperties);

You can check how numPartitions works

第二步:

  Dataset<Row> NewSet = source.join(target,
    target.col("ItemIDTarget").contains(source.col("ItemIDSource")),
    "inner");

由于表/数据框之一具有 5k 条记录(少量数据),您可以使用如下所述的广播连接。

import org.apache.spark.sql.functions.broadcast
val joined_df = largeTableDF.join(broadcast(smallTableDF), "key")

第三步: 使用 coalesce 减少分区的数量,以避免完全洗牌。

NewSet.coalesce(1).select("*")
        .write().format("com.databricks.spark.csv")
        .option("delimiter", ",")
        .option("header", "true")
        .option("treatEmptyValuesAsNulls", "true")  
        .option("nullValue", "")  
        .save(fileAbsolutePath);

希望我的回答对你有所帮助。

【讨论】:

  • 不错,fetchsize 选项可能在列表中。此选项影响每次往返获取行数的行为。注意:数字太大可能导致OOM异常。
猜你喜欢
  • 2018-09-01
  • 2016-02-10
  • 1970-01-01
  • 2020-11-10
  • 2015-04-05
  • 2018-08-16
  • 1970-01-01
  • 1970-01-01
  • 2018-02-01
相关资源
最近更新 更多