【问题标题】:Writing to cloud DB2 table using pyspark使用 pyspark 写入云 DB2 表
【发布时间】:2020-01-24 20:11:15
【问题描述】:

我刚刚使用 pyspark 开始了我的 ETL 之旅。我目前的目标是使用附加模式将数据从 .csv 写入 dashdb。但是,我遇到了一个我似乎无法解决的问题。这是我到目前为止所做的:

我已阅读 .csv 并将其注册到临时表中,以便能够对其调用 SQL 查询。查询的输出如下:

+--------------------+--------+-------+-----+------+
|              Street|District|Area_m2|Rooms| Price|
+--------------------+--------+-------+-----+------+
|          Angyalföld|    XIII|    105|    2|320000|
|            Belváros|       V|     70|    2|230000|
|         Pozsonyi út|    XIII|     89|    2|290000|
|         Fecske utca|    VIII|     33|    1|130000|
|Margó Tivadar utc...|   XVIII|     80|    2|220000|
|      Orczy út 46-48|    VIII|     44|    2|120000|
|        Vaskapu utca|      IX|     51|1 + 1|185000|
|       Gubacsi út 19|      IX|     30|    1|105000|
|         Öv utca 133|     XIV|     29|    1|150000|
|         Mérleg utca|       V|     54|    2|190000|
|          Szirtes út|       I|    160|    4|389000|
|       Gubacsi út 19|      IX|     50|    2|130000|
|          Török utca|      II|     53|1 + 1|165000|
|          Ferenc tér|      IX|     65|    2|235000|
|       Kiscelli utca|     III|     34|    1|190000|
|     Dózsa György út|     VII|     47|1 + 1|130000|
|         Vadász utca|       V|     60|1 + 1|185000|
|      István utca 7.|     VII|     30|    1|120000|
|          Regős utca|      XI|     53|    2|180000|
|      Országház utca|       I|    122|    3|680000|
+--------------------+--------+-------+-----+------+

db2 表中的列有以下类型:varchar(50)、varchar(10)、integer、varchar(10)、integer。 我将输出存储在一个变量中,并使用以下代码写入 dashdb:

output.write.jdbc(jdbc_url, table, properties = connection_properties, mode = 'append')

运行此代码会产生以下错误消息:

An error occurred while calling o310.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 11, localhost, executor driver): com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][3.62.56] Batch failure.  The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
    at com.ibm.db2.jcc.am.fd.a(fd.java:404)
    at com.ibm.db2.jcc.am.o.a(o.java:381)
    at com.ibm.db2.jcc.am.kn.a(kn.java:4523)
    at com.ibm.db2.jcc.am.kn.c(kn.java:4294)
    at com.ibm.db2.jcc.am.kn.executeBatch(kn.java:2600)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: com.ibm.db2.jcc.am.SqlDataException: Error for batch element #66: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=3.62.56
        at com.ibm.db2.jcc.am.fd.a(fd.java:668)
        at com.ibm.db2.jcc.am.fd.a(fd.java:60)
        at com.ibm.db2.jcc.am.fd.a(fd.java:127)
        at com.ibm.db2.jcc.t4.cb.a(cb.java:481)
        at com.ibm.db2.jcc.t4.cb.a(cb.java:70)
        at com.ibm.db2.jcc.t4.q.a(q.java:57)
        at com.ibm.db2.jcc.t4.sb.a(sb.java:225)
        at com.ibm.db2.jcc.am.kn.a(kn.java:3083)
        at com.ibm.db2.jcc.am.kn.d(kn.java:5019)
        at com.ibm.db2.jcc.am.kn.a(kn.java:4466)
        ... 17 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:515)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][3.62.56] Batch failure.  The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
    at com.ibm.db2.jcc.am.fd.a(fd.java:404)
    at com.ibm.db2.jcc.am.o.a(o.java:381)
    at com.ibm.db2.jcc.am.kn.a(kn.java:4523)
    at com.ibm.db2.jcc.am.kn.c(kn.java:4294)
    at com.ibm.db2.jcc.am.kn.executeBatch(kn.java:2600)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
    Suppressed: com.ibm.db2.jcc.am.SqlDataException: Error for batch element #66: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=3.62.56
        at com.ibm.db2.jcc.am.fd.a(fd.java:668)
        at com.ibm.db2.jcc.am.fd.a(fd.java:60)
        at com.ibm.db2.jcc.am.fd.a(fd.java:127)
        at com.ibm.db2.jcc.t4.cb.a(cb.java:481)
        at com.ibm.db2.jcc.t4.cb.a(cb.java:70)
        at com.ibm.db2.jcc.t4.q.a(q.java:57)
        at com.ibm.db2.jcc.t4.sb.a(sb.java:225)
        at com.ibm.db2.jcc.am.kn.a(kn.java:3083)
        at com.ibm.db2.jcc.am.kn.d(kn.java:5019)
        at com.ibm.db2.jcc.am.kn.a(kn.java:4466)
        ... 17 more

谁能解释一下出了什么问题以及如何解决这个问题? 提前谢谢!

【问题讨论】:

  • 该消息告诉您批处理失败,因为 Db2 返回了异常 SQL0302N EXECUTE 或 OPEN 语句中的主机变量的值超出了相应使用的范围。阅读:ibm.com/support/knowledgecenter/SSEPGG_11.5.0/…。可能您的代码会导致一些不必要或不正确的类型转换。您的源数据库 Db2 是本地数据库,而您的目标数据库是云中的(dashdb?)?还有其他方法可以将数据提升到云端...
  • 嗨,毛!感谢您的回复。源代码是我从 pandas 数据框中拼凑而成的 .csv 文件。目标在云端。该项目是使用 pyspark 将初始数据加载到 dashdb,然后有另一个脚本来执行 CDC,将 dashdb 表上的当前数据与源数据进行比较,并添加插入、更新和删除。该脚本只会将数据初始加载到 dashdb,它的作用更像是概念证明。我愿意接受其他使用 pyspark 将数据加载到 dashdb 的解决方案
  • 编辑您的问题以提供详细信息:什么操作系统运行 pyspark?确保为 unicode/utf-8 配置连接到 Db2 的作业。您正在使用从 v9.7 fp4 开始的非常旧的 jdbc3 驱动程序。考虑使用当前的 jdbc4 驱动程序,从 ibm.com/support/pages/db2-jdbc-driver-versions-and-downloads 获取,或者从本地 DBA 获取(如果有的话)。
  • 我从 Windows 10 运行 pyspark。感谢您的建议,我更改了 jdbc 驱动程序。但是,这个问题仍然存在。我检查了 SQL0302N 并将输出查询更改为根据 dashdb 中的 DDL 转换每一列。仍然没有运气。这就是我指定查询的方式: output = spark.sql('SELECT CAST(STREET AS VARCHAR(50)) as STREET, CAST(DISTRICT AS VARCHAR(10)) AS DISTRICT, CAST(AREA_M2 AS INTEGER) AS AREA_M2, CAST(ROOMS AS VARCHAR(10)) 作为 ROOMS, CAST(PRICE AS INTEGER) AS PRICE from BP_real_estate')
  • @mao 通过从 STREET 列中删除重音字符解决了这个问题。感谢您的提示

标签: jdbc pyspark db2 append etl


【解决方案1】:

可以通过确保 .csv 的 utf-8 编码和删除特殊字符来解决此问题。

【讨论】:

  • 我在将数据从 Scala / Spark 附加到 db2 表时遇到了同样的错误。任何帮助将不胜感激。
猜你喜欢
  • 2021-09-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-08-06
  • 2018-10-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多