【发布时间】:2020-08-20 09:10:28
【问题描述】:
我在一个 xml 文件中有 2700 万条记录,我想将其推送到 elasticsearch 索引中 下面是用 spark scala 编写的代码 sn-p,我将创建一个 spark job jar 并在 AWS EMR 上运行
我怎样才能有效地使用火花来完成这个练习?请指导。
我有一个 12.5 gb 的压缩 xml,我正在将其加载到 spark 数据帧中。我是 Spark 的新手..(我应该拆分这个 gzip 文件吗?还是 spark 执行器会处理它?)
class ReadFromXML {
def createXMLDF(): DataFrame = {
val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
import spark.implicits._
val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)
var new_df: DataFrame = null
new_df = m_df.select($"CountryCode"(0).as("countryCode"),
$"PostalCode"(0).as("postalCode"),
$"state"(0).as("state"),
$"county"(0).as("county"),
$"city"(0).as("city"),
$"district"(0).as("district"),
$"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
$"FullStreetName"(0).as("street"),
functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal"))
.where($"LocationList.Location._primary" === "true")
.where("(array_contains(_languageCode, 'en'))")
.where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))
new_df.drop("name")
}
}
object PushToES extends App {
val spark = SparkSession
.builder()
.appName("PushToES")
.master("local[*]")
.config("spark.es.nodes", "awsurl")
.config("spark.es.port", "port")
.config("spark.es.nodes.wan.only", "true")
.config("spark.es.net.ssl", "true")
.getOrCreate()
val extractor = new ReadFromXML()
val df = extractor.createXMLDF()
df.saveToEs("myindex/_doc")
}
更新 1: 我已将每个文件拆分为 68M,读取这个单个文件需要 3.7 分钟 我试图使用 snappy 而不是 gzip 压缩编解码器 所以将gz文件转换成snappy文件并在config下面添加
.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
但它返回空数据框
df.printschema 只返回“root”
更新 2: 我已经设法以 lzo 格式运行..解压缩和加载数据帧所需的时间非常少。
遍历每个大小为 140 MB 的 lzo 压缩文件并创建数据帧是个好主意吗? 或
我应该在数据框中加载 10 个文件吗? 或
我应该在单个数据帧中加载所有 200 个 lzo 压缩文件,每个 140MB 吗?如果是,那么应该为 master 分配多少内存,因为我认为这将加载到 master 上?
从 s3 存储桶读取文件时,“s3a” uri 可以提高性能吗?还是“s3” uri 可以用于 EMR?
更新 3: 测试一小组 10 个 lzo 文件。我使用了以下配置。 EMR Cluster 总共花费了 56 分钟,从这一步(Spark 应用程序)花费了 48 分钟来处理 10 个文件
1 大师 - m5.xlarge 4 vCore,16 GiB 内存,仅 EBS 存储 EBS 存储:32 GiB
2 核 - m5.xlarge 4 vCore,16 GiB 内存,仅 EBS 存储 EBS 存储:32 GiB
使用以下从 https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/ 学习的 Spark 调整参数
[
{
"Classification": "yarn-site",
"Properties": {
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "false"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.network.timeout": "800s",
"spark.executor.heartbeatInterval": "60s",
"spark.dynamicAllocation.enabled": "false",
"spark.driver.memory": "10800M",
"spark.executor.memory": "10800M",
"spark.executor.cores": "2",
"spark.executor.memoryOverhead": "1200M",
"spark.driver.memoryOverhead": "1200M",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.yarn.scheduler.reporterThread.maxFailures": "5",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.default.parallelism": "4"
}
},
{
"Classification": "mapred-site",
"Properties": {
"mapreduce.map.output.compress": "true"
}
}
]
【问题讨论】:
-
Spark 是您任务中的硬约束吗? 12GB 的数据可能不足以开始使用大数据解决方案,也许一个带有基于事件的 XML 解析器的简单控制台工具在您的情况下可以正常工作。您能否详细说明选择 Spark 的原因以及您是否考虑不基于 EMR/Spark 的解决方案?
-
@fenixil 12 GB 是包含 2700 万条记录的 gzip 压缩数据……您能否详细说明还有哪些其他选项可用?如果它适合我的需要,我可以切换到它..我可以运行本地 spark 来推送这么多数据吗?..我是大数据的新手
-
stackoverflow.com/questions/16302385/…... Gzip 文件不可拆分... 您还可以检查数据砖 API github.com/databricks/spark-xml 或者您可以编写自定义实现 DataSourceV2 Reader API 来读取 gziip xml并将它们转换为 spark ROW 对象 并在 spark 中注册。
-
@kavetiraviteja 我正在使用 databricks spark-xml,上面的代码适用于 gzip 压缩文件。我担心的是..上面的代码是否需要任何修改才能更有效地使用 spark 功能..因为我是新来的火花,所以我是否正确使用它?或者我认为将 xml 文件拆分成更小的文件可以提高性能?
-
我认为将 xml 文件拆分成更小的文件可以提高性能?是的,它会,但不会太小......确保将它们分成合适的大小............什么是执行器内存和执行器核心配置。
标签: scala apache-spark elasticsearch apache-spark-sql