【发布时间】:2019-10-01 05:17:49
【问题描述】:
我正在读取 1 GB 的 CSV 文件(记录数:1000 万,列:13)并尝试将其转储到 SQL 服务器中。以下是基础设施细节:
CSV 文件位置:Azure Blob 存储
代码:Spark + Scala
-
用于读取文件并转储的代码:
val df = spark.read.format(fileparser_config("fileFormat").as[String]).option("header", fileparser_config("IsFirstRowHeader").toString).load(fileparser_config("FileName"). as[String]).withColumn("_ID", monotonically_increasing_id)
val bulkCopyConfig = Config(Map( "url" -> connConfig("dataSource").as[String], "databaseName" -> connConfig("dbName").as[String], “用户”-> connConfig(“用户名”).as[字符串], "密码" -> connConfig("密码").as[字符串], “dbTable”-> 表名, "bulkCopyBatchSize" -> "500000", "bulkCopyTableLock" -> "true", "bulkCopyTimeout" -> "600"))
println(s" ${LocalDateTime.now()} ************ sql bulk insert start ************")
df.bulkCopyToSqlDB(bulkCopyConfig)
println(s" ${LocalDateTime.now()} ************ sql bulk insert end ************")
问题:
集群陷入困境,我的工作永远无法完成。有一次,当它运行的时间足够长时,它会抛出一个错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 38.0 failed 4 times, most recent failure: Lost task 13.3 in stage 38.0 (TID 1532, 10.0.6.6, executor 4): com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.\n\tat com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:227)\n\tat com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:796)\n\tat com.microsoft.sqlserver.jdbc.SQLServ
- 集群事件日志:
-
其他观察:
- 虽然作业运行了很长时间,但集群并非完全没有响应。我通过在同一窗口中提交更多作业来尝试此操作。作业运行了,但花费的时间比平时多(大约 10 倍)
- 我尝试增加工作节点和节点类型(甚至选择了 128 GB 节点),但结果仍然相同。
- 在作业运行时,我尝试使用 nolock 查询检查 SQL 表行数。我在作业运行的 3-4 分钟后运行了这个,它在表中给了我大约 200 万条记录。但是当我在 10 分钟后再次运行时,查询一直在运行,并且从未返回任何记录。
- 我已尝试调整 bulkCopyBatchSize 属性,但没有太大帮助。
- 我试图删除 sqlinsertion 代码并在我从 1 GB 文件创建的数据帧上使用聚合操作,整个过程只需要 40-50 秒,所以问题只出在 sql driver/sql server。
【问题讨论】:
标签: scala apache-spark apache-spark-sql bulkinsert azure-databricks