【问题标题】:Reading huge CSV file with Spark使用 Spark 读取巨大的 CSV 文件
【发布时间】:2020-08-05 17:36:15
【问题描述】:

我有 27 GB gz csv 文件,我正在尝试使用 Spark 读取该文件。 我们最大的节点有 30 GB 内存。

当我尝试读取文件时,只有一个执行程序正在加载数据(我正在监视内存和网络),其他 4 个已过时。

一段时间后,它由于内存而崩溃。
有没有办法并行读取这个文件?

Dataset<Row> result = sparkSession.read()
                .option("header","true")
                .option("escape", "\"")
                .option("multiLine","true")
                .format("csv")
                .load("s3a://csv-bucket");

result.repartition(10)

spark_conf:
 spark.executor.memoryOverhead: "512"
 spark.executor.cores: "5"

driver:
  memory: 10G

executor:
  instances: 5
  memory: 30G

【问题讨论】:

  • gz 文件不可拆分,因此读取发生在一个执行程序上。将gz 文件读入数据帧后,您可以对其进行重新分区。
  • @VamsiPrabhala 是对的......您必须重新分区以确保数据的均匀分布,否则可能会发生 OOM 或处理速度很慢。
  • 我试图在它到达重新分区之前重新分区失败的数据。我认为未压缩的数据不适合单机
  • 您可以尝试以下网址中给出的解决方案,stackoverflow.com/questions/46638901/…

标签: java scala csv apache-spark


【解决方案1】:

当涉及到大量数据时,您必须重新分区数据

在spark中的并行单位是分区

Dataset<Row> result = sparkSession.read()
                .option("header","true")
                .option("escape", "\"")
                .option("multiLine","true")
                .format("csv")
                .load("s3a://csv-bucket");



result.repartition(5 * 5 *3) ( number of executors i.e.5 * cores i.e. 5 * replicationfactor i.e. 2-3)  i.e. 25 might be working for you to ensure uniform disribution data.

交叉检查每个分区有多少条记录 import org.apache.spark.sql.functions.spark_partition_id yourcsvdataframe.groupBy(spark_partition_id).count.show()

例子:

  val mycsvdata =
    """
      |rank,freq,Infinitiv,Unreg,Trans,"Präsens_ich","Präsens_du","Präsens_er, sie, es","Präteritum_ich","Partizip II","Konjunktiv II_ich","Imperativ Singular","Imperativ Plural",Hilfsverb
      |3,3796784,sein,"","",bin,bist,ist,war,gewesen,"wäre",sei,seid,sein
      |8,1618550,haben,"","",habe,hast,hat,hatte,gehabt,"hätte",habe,habt,haben
      |10,1379496,einen,"","",eine,einst,eint,einte,geeint,einte,eine,eint,haben
      |12,948246,werden,"","",werde,wirst,wird,wurde,geworden,"würde",werde,werdet,sein
    """.stripMargin.lines.toList.toDS
  val csvdf: DataFrame = spark.read.option("header", true)
    .option("header", true)
    .csv(mycsvdata)

  csvdf.show(false)
  println("all the 4 records are in single partition 0 ")

  import org.apache.spark.sql.functions.spark_partition_id
  csvdf.groupBy(spark_partition_id).count.show()

  println( "now divide data... 4 records to 2 per partition")
  csvdf.repartition(2).groupBy(spark_partition_id).count.show()

结果:

 +----+-------+---------+-----+-----+-----------+----------+-------------------+--------------+-----------+-----------------+------------------+----------------+---------+
|rank|freq   |Infinitiv|Unreg|Trans|Präsens_ich|Präsens_du|Präsens_er, sie, es|Präteritum_ich|Partizip II|Konjunktiv II_ich|Imperativ Singular|Imperativ Plural|Hilfsverb|
+----+-------+---------+-----+-----+-----------+----------+-------------------+--------------+-----------+-----------------+------------------+----------------+---------+
|3   |3796784|sein     |null |null |bin        |bist      |ist                |war           |gewesen    |wäre             |sei               |seid            |sein     |
|8   |1618550|haben    |null |null |habe       |hast      |hat                |hatte         |gehabt     |hätte            |habe              |habt            |haben    |
|10  |1379496|einen    |null |null |eine       |einst     |eint               |einte         |geeint     |einte            |eine              |eint            |haben    |
|12  |948246 |werden   |null |null |werde      |wirst     |wird               |wurde         |geworden   |würde            |werde             |werdet          |sein     |
+----+-------+---------+-----+-----+-----------+----------+-------------------+--------------+-----------+-----------------+------------------+----------------+---------+

all the 4 records are in single partition 0 
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   0|    4|
+--------------------+-----+

now divide data... 4 records to 2 per partition
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   1|    2|
|                   0|    2|
+--------------------+-----+

【讨论】:

  • 我试图在它到达重新分区之前重新分区它失败的数据。我认为未压缩的数据不适合单机
  • 因为 gz 不可拆分 .. csvdf.repartition(200).write.parquet("/path/to/repartitioned/part/file") 然后处理 parquet 也可能需要增加执行器内存还有..
  • 我收到此错误:ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 123950 ms 它重试然后失败
  • 完整堆栈跟踪中的原因原因/原因这只是一个症状
猜你喜欢
  • 1970-01-01
  • 2019-03-25
  • 2021-05-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多