【问题标题】:Use Apache Spark efficiently to push data to elasticsearch高效使用 Apache Spark 将数据推送到 elasticsearch
【发布时间】:2020-08-20 09:10:28
【问题描述】:

我在一个 xml 文件中有 2700 万条记录,我想将其推送到 elasticsearch 索引中 下面是用 spark scala 编写的代码 sn-p,我将创建一个 spark job jar 并在 AWS EMR 上运行

我怎样才能有效地使用火花来完成这个练习?请指导。

我有一个 12.5 gb 的压缩 xml,我正在将其加载到 spark 数据帧中。我是 Spark 的新手..(我应该拆分这个 gzip 文件吗?还是 spark 执行器会处理它?)

class ReadFromXML {

  def createXMLDF(): DataFrame = {
    val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
    import spark.implicits._
    val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)

    var new_df: DataFrame = null
      
      new_df = m_df.select($"CountryCode"(0).as("countryCode"),
        $"PostalCode"(0).as("postalCode"),
        $"state"(0).as("state"),
        $"county"(0).as("county"),
        $"city"(0).as("city"),
        $"district"(0).as("district"),
        $"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
        $"FullStreetName"(0).as("street"),
        functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal"))
        .where($"LocationList.Location._primary" === "true")
        .where("(array_contains(_languageCode, 'en'))")
        .where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))
    

    new_df.drop("name")
  }
}

object PushToES extends App {
  val spark = SparkSession
    .builder()
    .appName("PushToES")
    .master("local[*]")
    .config("spark.es.nodes", "awsurl")
    .config("spark.es.port", "port")
    .config("spark.es.nodes.wan.only", "true")
    .config("spark.es.net.ssl", "true")
    .getOrCreate()

  val extractor = new ReadFromXML()

  val df = extractor.createXMLDF()
  df.saveToEs("myindex/_doc")
}

更新 1: 我已将每个文件拆分为 68M,读取这个单个文件需要 3.7 分钟 我试图使用 snappy 而不是 gzip 压缩编解码器 所以将gz文件转换成snappy文件并在config下面添加

.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")

但它返回空数据框

df.printschema 只返回“root”

更新 2: 我已经设法以 lzo 格式运行..解压缩和加载数据帧所需的时间非常少。

遍历每个大小为 140 MB 的 lzo 压缩文件并创建数据帧是个好主意吗? 或

我应该在数据框中加载 10 个文件吗? 或

我应该在单个数据帧中加载所有 200 个 lzo 压缩文件,每个 140MB 吗?如果是,那么应该为 master 分配多少内存,因为我认为这将加载到 master 上?

从 s3 存储桶读取文件时,“s3a” uri 可以提高性能吗?还是“s3” uri 可以用于 EMR?

更新 3: 测试一小组 10 个 lzo 文件。我使用了以下配置。 EMR Cluster 总共花费了 56 分钟,从这一步(Spark 应用程序)花费了 48 分钟来处理 10 个文件

1 大师 - m5.xlarge 4 vCore,16 GiB 内存,仅 EBS 存储 EBS 存储:32 GiB

2 核 - m5.xlarge 4 vCore,16 GiB 内存,仅 EBS 存储 EBS 存储:32 GiB

使用以下从 https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/ 学习的 Spark 调整参数

[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.nodemanager.vmem-check-enabled": "false",
      "yarn.nodemanager.pmem-check-enabled": "false"
    }
  },
  {
    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "false"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.network.timeout": "800s",
      "spark.executor.heartbeatInterval": "60s",
      "spark.dynamicAllocation.enabled": "false",
      "spark.driver.memory": "10800M",
      "spark.executor.memory": "10800M",
      "spark.executor.cores": "2",
      "spark.executor.memoryOverhead": "1200M",
      "spark.driver.memoryOverhead": "1200M",
      "spark.memory.fraction": "0.80",
      "spark.memory.storageFraction": "0.30",
      "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.yarn.scheduler.reporterThread.maxFailures": "5",
      "spark.storage.level": "MEMORY_AND_DISK_SER",
      "spark.rdd.compress": "true",
      "spark.shuffle.compress": "true",
      "spark.shuffle.spill.compress": "true",
      "spark.default.parallelism": "4"
    }
  },
  {
    "Classification": "mapred-site",
    "Properties": {
      "mapreduce.map.output.compress": "true"
    }
  }
]

【问题讨论】:

  • Spark 是您任务中的硬约束吗? 12GB 的数据可能不足以开始使用大数据解决方案,也许一个带有基于事件的 XML 解析器的简单控制台工具在您的情况下可以正常工作。您能否详细说明选择 Spark 的原因以及您是否考虑不基于 EMR/Spark 的解决方案?
  • @fenixil 12 GB 是包含 2700 万条记录的 gzip 压缩数据……您能否详细说明还有哪些其他选项可用?如果它适合我​​的需要,我可以切换到它..我可以运行本地 spark 来推送这么多数据吗?..我是大数据的新手
  • stackoverflow.com/questions/16302385/…... Gzip 文件不可拆分... 您还可以检查数据砖 API github.com/databricks/spark-xml 或者您可以编写自定义实现 DataSourceV2 Reader API 来读取 gziip xml并将它们转换为 spark ROW 对象 并在 spark 中注册。
  • @kavetiraviteja 我正在使用 databricks spark-xml,上面的代码适用于 gzip 压缩文件。我担心的是..上面的代码是否需要任何修改才能更有效地使用 spark 功能..因为我是新来的火花,所以我是否正确使用它?或者我认为将 xml 文件拆分成更小的文件可以提高性能?
  • 我认为将 xml 文件拆分成更小的文件可以提高性能?是的,它会,但不会太小......确保将它们分成合适的大小............什么是执行器内存和执行器核心配置。

标签: scala apache-spark elasticsearch apache-spark-sql


【解决方案1】:

以下是我的一些建议。

以 parquet 格式或任何格式读取数据。根据您的需要重新分区。数据转换可能会消耗时间,所以在 spark 中读取它然后处理它。尝试在开始加载之前创建地图和格式数据。这将有助于在复杂地图的情况下轻松调试。

  val spark = SparkSession
    .builder()
    .appName("PushToES")
    .enableHiveSupport()
    .getOrCreate()


val batchSizeInMB=4; // change it as you need
val batchRetryCount= 3
val batchWriteRetryWait = 10
val batchEntries= 10
val enableSSL = true
val wanOnly = true
val enableIdempotentInserts = true
val esNodes = [yourNode1, yourNode2, yourNode3]
var esConfig = Map[String, String]()
esConfig = esConfig + ("es.node"-> esNodes.mkString)(","))
esConfig = esConfig + ("es.port"->port.toString())
esConfig = esConfig + ("es.batch.size.bytes"->(batchSizeInMB*1024*1024).toString())
esConfig = esConfig + ("es.batch.size.entries"->batchEntries.toString())
esConfig = esConfig + ("es.batch.write.retry.count"->batchRetryCount.toString())
esConfig = esConfig + ("es.batch.write.retry.wait"->batchWriteRetryWait.toString())
esConfig = esConfig + ("es.batch.write.refresh"->"false")
if(enableSSL){
esConfig = esConfig + ("es.net.ssl"->"true")
esConfig = esConfig + ("es.net.ssl.keystore.location"->"identity.jks")
esConfig = esConfig + ("es.net.ssl.cert.allow.self.signed"->"true")
}
if (wanOnly){
esConfig = esConfig + ("es.nodes.wan.only"->"true")
}

// This helps if some task fails , so data won't be dublicate
if(enableIdempotentInserts){
  esConfig = esConfig + ("es.mapping.id" ->"your_primary_key_column")
}

val df = "suppose you created it using parquet format or any format"

实际上数据是在执行器级别而不是驱动程序级别插入的 尝试只给每个执行器 2-4 个核心,这样就不会同时打开那么多连接。 您可以根据自己的喜好改变文档大小或条目。请阅读它们。

分块写入数据,这将有助于您将来加载大型数据集 并尝试在加载数据之前创建索引映射。并且更喜欢很少的嵌套数据,因为您在 ES 中具有该功能 我的意思是尽量在你的数据中保留一些主键。

val dfToInsert = df.withColumn("salt", ceil(rand())*10).cast("Int").persist()
for (i<-0 to 10){
val start = System.currentTimeMillis
val finalDF = dfToInsert.filter($"salt"===i)
val counts = finalDF.count()
println(s"count of record in chunk $i -> $counts")
finalDF.drop("salt").saveToES("indexName",esConfig)
val totalTime = System.currentTimeMillis - start
println(s"ended Loading data for chunk $i. Total time taken in Seconds : ${totalTime/1000}")
}

尝试为最终的 DF 提供一些别名,并在每次运行时更新它。因为您不想打扰您的生产服务器 加载时

内存

这不能是通用的。但只是为了给你一个开始

根据您的数据大小或预算保留 10-40 个执行器。保持每个 执行器 8-16gb 大小和 5gb 开销。 (这可能因您的 文件可以大也可以小)。如果需要,保留 maxResultSize 8gb。 驱动程序可以有 5 个内核和 30 克内存

重要的事情。

  • 您需要将配置保存在变量中,因为您可以根据索引进行更改

  • 插入发生在执行程序而不是驱动程序上,所以尽量保持较小 写作时的连接。每个核心将打开一个连接。

  • 文档插入可以是批量输入大小或文档大小。 在进行多次运行时根据您的学习进行更改。

  • 尝试使您的解决方案稳健。它应该能够处理所有尺寸数据。 读取和写入都可以调整,但尝试将数据格式化为 开始加载之前的每个文档图。这将有助于轻松 调试,如果数据文档有点复杂和嵌套。

  • spark-submit 的内存也可以根据您在运行时的学习情况进行调整 工作。只需尝试通过改变内存和批处理来查看插入时间 大小。

  • 最重要的是设计。如果您使用的是 ES 而不是创建 您的地图,同时牢记最终查询和要求。

【讨论】:

  • 可以将所有 200 个 lzo 文件一起加载到一个数据帧中吗?
  • 是的,因为我们稍后使用盐分对数据进行了分块。所以不用担心!
  • 请确认..所有数据会被分成10个chunk然后dump到ES中?加盐时需要repartition
  • 并且盐列值将从0到9?..因为没有盐= 10的记录
  • 是的 10 不会有任何东西。它是一个更正。关于重新分区,我会说密切关注您的 spark UI。我相信在你的情况下,Repartition 只是有助于在所有分区上平均分布你的数据(或者如果我们想以波数而不是单次进行。)首先运行它而不做任何更改,然后在增加它之后测量性能.保持其为总核心数的倍数,并仅尝试高于 200。
【解决方案2】:

不是一个完整的答案,但评论仍然有点长。我想提出一些建议。

目前尚不清楚,但我认为您担心的是执行时间。正如 cmets 中所建议的,您可以通过向集群添加更多节点/执行程序来提高性能。如果在 spark 中加载 gzip 文件时未进行分区,则应将其拆分为合理的大小。 (不要太小 - 这会使处理变慢。不要太大 - 执行程序会运行 OOM)。

parquet 在使用 Spark 时是一种很好的文件格式。如果您可以将 XML 转换为镶木地板。它是超级压缩和轻量级的。

阅读您的 cmets,coalesce 不会完全随机播放。合并算法通过将数据从某些分区移动到现有分区来改变节点的数量。这种算法显然不能增加分区数。请改用repartition。该操作成本高昂,但可以增加分区数量。查看更多事实:https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-08-04
    • 2020-05-10
    • 2021-10-17
    • 2017-02-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多