【发布时间】:2020-01-24 20:11:15
【问题描述】:
我刚刚使用 pyspark 开始了我的 ETL 之旅。我目前的目标是使用附加模式将数据从 .csv 写入 dashdb。但是,我遇到了一个我似乎无法解决的问题。这是我到目前为止所做的:
我已阅读 .csv 并将其注册到临时表中,以便能够对其调用 SQL 查询。查询的输出如下:
+--------------------+--------+-------+-----+------+
| Street|District|Area_m2|Rooms| Price|
+--------------------+--------+-------+-----+------+
| Angyalföld| XIII| 105| 2|320000|
| Belváros| V| 70| 2|230000|
| Pozsonyi út| XIII| 89| 2|290000|
| Fecske utca| VIII| 33| 1|130000|
|Margó Tivadar utc...| XVIII| 80| 2|220000|
| Orczy út 46-48| VIII| 44| 2|120000|
| Vaskapu utca| IX| 51|1 + 1|185000|
| Gubacsi út 19| IX| 30| 1|105000|
| Öv utca 133| XIV| 29| 1|150000|
| Mérleg utca| V| 54| 2|190000|
| Szirtes út| I| 160| 4|389000|
| Gubacsi út 19| IX| 50| 2|130000|
| Török utca| II| 53|1 + 1|165000|
| Ferenc tér| IX| 65| 2|235000|
| Kiscelli utca| III| 34| 1|190000|
| Dózsa György út| VII| 47|1 + 1|130000|
| Vadász utca| V| 60|1 + 1|185000|
| István utca 7.| VII| 30| 1|120000|
| Regős utca| XI| 53| 2|180000|
| Országház utca| I| 122| 3|680000|
+--------------------+--------+-------+-----+------+
db2 表中的列有以下类型:varchar(50)、varchar(10)、integer、varchar(10)、integer。 我将输出存储在一个变量中,并使用以下代码写入 dashdb:
output.write.jdbc(jdbc_url, table, properties = connection_properties, mode = 'append')
运行此代码会产生以下错误消息:
An error occurred while calling o310.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 11, localhost, executor driver): com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][3.62.56] Batch failure. The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
at com.ibm.db2.jcc.am.fd.a(fd.java:404)
at com.ibm.db2.jcc.am.o.a(o.java:381)
at com.ibm.db2.jcc.am.kn.a(kn.java:4523)
at com.ibm.db2.jcc.am.kn.c(kn.java:4294)
at com.ibm.db2.jcc.am.kn.executeBatch(kn.java:2600)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: com.ibm.db2.jcc.am.SqlDataException: Error for batch element #66: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=3.62.56
at com.ibm.db2.jcc.am.fd.a(fd.java:668)
at com.ibm.db2.jcc.am.fd.a(fd.java:60)
at com.ibm.db2.jcc.am.fd.a(fd.java:127)
at com.ibm.db2.jcc.t4.cb.a(cb.java:481)
at com.ibm.db2.jcc.t4.cb.a(cb.java:70)
at com.ibm.db2.jcc.t4.q.a(q.java:57)
at com.ibm.db2.jcc.t4.sb.a(sb.java:225)
at com.ibm.db2.jcc.am.kn.a(kn.java:3083)
at com.ibm.db2.jcc.am.kn.d(kn.java:5019)
at com.ibm.db2.jcc.am.kn.a(kn.java:4466)
... 17 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:515)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][3.62.56] Batch failure. The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
at com.ibm.db2.jcc.am.fd.a(fd.java:404)
at com.ibm.db2.jcc.am.o.a(o.java:381)
at com.ibm.db2.jcc.am.kn.a(kn.java:4523)
at com.ibm.db2.jcc.am.kn.c(kn.java:4294)
at com.ibm.db2.jcc.am.kn.executeBatch(kn.java:2600)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Suppressed: com.ibm.db2.jcc.am.SqlDataException: Error for batch element #66: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=3.62.56
at com.ibm.db2.jcc.am.fd.a(fd.java:668)
at com.ibm.db2.jcc.am.fd.a(fd.java:60)
at com.ibm.db2.jcc.am.fd.a(fd.java:127)
at com.ibm.db2.jcc.t4.cb.a(cb.java:481)
at com.ibm.db2.jcc.t4.cb.a(cb.java:70)
at com.ibm.db2.jcc.t4.q.a(q.java:57)
at com.ibm.db2.jcc.t4.sb.a(sb.java:225)
at com.ibm.db2.jcc.am.kn.a(kn.java:3083)
at com.ibm.db2.jcc.am.kn.d(kn.java:5019)
at com.ibm.db2.jcc.am.kn.a(kn.java:4466)
... 17 more
谁能解释一下出了什么问题以及如何解决这个问题? 提前谢谢!
【问题讨论】:
-
该消息告诉您批处理失败,因为 Db2 返回了异常 SQL0302N EXECUTE 或 OPEN 语句中的主机变量的值超出了相应使用的范围。阅读:ibm.com/support/knowledgecenter/SSEPGG_11.5.0/…。可能您的代码会导致一些不必要或不正确的类型转换。您的源数据库 Db2 是本地数据库,而您的目标数据库是云中的(dashdb?)?还有其他方法可以将数据提升到云端...
-
嗨,毛!感谢您的回复。源代码是我从 pandas 数据框中拼凑而成的 .csv 文件。目标在云端。该项目是使用 pyspark 将初始数据加载到 dashdb,然后有另一个脚本来执行 CDC,将 dashdb 表上的当前数据与源数据进行比较,并添加插入、更新和删除。该脚本只会将数据初始加载到 dashdb,它的作用更像是概念证明。我愿意接受其他使用 pyspark 将数据加载到 dashdb 的解决方案
-
编辑您的问题以提供详细信息:什么操作系统运行 pyspark?确保为 unicode/utf-8 配置连接到 Db2 的作业。您正在使用从 v9.7 fp4 开始的非常旧的 jdbc3 驱动程序。考虑使用当前的 jdbc4 驱动程序,从 ibm.com/support/pages/db2-jdbc-driver-versions-and-downloads 获取,或者从本地 DBA 获取(如果有的话)。
-
我从 Windows 10 运行 pyspark。感谢您的建议,我更改了 jdbc 驱动程序。但是,这个问题仍然存在。我检查了 SQL0302N 并将输出查询更改为根据 dashdb 中的 DDL 转换每一列。仍然没有运气。这就是我指定查询的方式: output = spark.sql('SELECT CAST(STREET AS VARCHAR(50)) as STREET, CAST(DISTRICT AS VARCHAR(10)) AS DISTRICT, CAST(AREA_M2 AS INTEGER) AS AREA_M2, CAST(ROOMS AS VARCHAR(10)) 作为 ROOMS, CAST(PRICE AS INTEGER) AS PRICE from BP_real_estate')
-
@mao 通过从 STREET 列中删除重音字符解决了这个问题。感谢您的提示
标签: jdbc pyspark db2 append etl